Declaring Priors using Loops for State Space Models

Hi all, I’m a Stan-user experimenting with PyMC for the first time.

I’ve been converting some State-Space Models from Stan to PyMC as an exercise. Today I converted one such model: a varying-intercepts model where each timepoint has its own intercept, but the prior for each intercept \alpha_t is \text{Normal}(\alpha_{t-1}, \sigma_\alpha). So we have this recursive mean structure in the varying intercepts.

I have fit the model in PyMC no problem using GaussianRandomWalk to capture this recursiveness:

import pymc as pm

with pm.Model() as model_2_2:
    
    # Priors on scale parameters
    sigma_irreg = pm.HalfNormal('sigma_irreg', sigma=1)
    sigma_alpha = pm.HalfNormal('sigma_level', sigma=1)

    # Using GaussianRandomWalk for the varying intercepts
    mu_seq = pm.GaussianRandomWalk('mu_seq', sigma=sigma_alpha, shape=len(dat))
    
    # Likelihood
    y_obs = pm.Normal('y_obs', mu=mu_seq, sigma=sigma_irreg, observed=dat['log_y'])
    
    # Go!
    trace = pm.sample(2000, chains=4, tune=1000, return_inferencedata=True)

But one thing I appreciate about the Stan syntax for this same model is that you can make the recursiveness explicit by defining priors using for loops. For example, in Stan you can do:

data {
  int<lower=1> n;
}
parameters {
  vector[n] mu;
}
model {
  for(t in 2:n)
    mu[t] ~ normal(mu[t-1], sigma_alpha);
}

I find this Stan syntax nice for pedagogical reasons (i.e. Bayesian State Space Models can be framed as iterative Bayesian inference with drift in the latent state between each iteration, and I think a loop helps illustrate this), but also I think it is more flexible: it gives you the option to model the relationship between adjacent timepoints using other variables, to easily use other distributions for the relationship, etc.

I was able to use the for loop approach in PyMC when defining this model for 10 observations, but for the full dataset (~200 observations) I ran into C++ compilation errors. I tried a version that used Pytensor scan, but hit an error: ValueError: Random variables detected in the logp graph: {mu_t}.

import pytensor as pt
import pytensor.tensor as pt

with pm.Model() as model:

    n = len(dat['log_y']) 
    y_observed = dat['log_y'] 

    # Priors for standard deviations
    sigma_level = pm.HalfNormal('sigma_level', sigma=1)
    sigma_irreg = pm.HalfNormal('sigma_irreg', sigma=1)

    # Initial value of mu
    mu_init = pm.Normal('mu_init', mu=0, sigma=1)

    # Iterative computation of timepoint-specific mu
    def step_fn(mu_tm1, sigma_level):
        mu_t = pm.Normal('mu_t', mu=mu_tm1, sigma=sigma_level)
        return mu_t

    mu, updates = pytensor.scan(fn=step_fn,
                              outputs_info=[dict(initial=mu_init)],
                              non_sequences=sigma_level,
                              n_steps=n-1)

    # Include the initial value in the mu sequence
    mu_seq = pt.concatenate([[mu_init], mu])

    # Likelihood 
    y_obs = pm.Normal('y_obs', mu=mu_seq, sigma=sigma_irreg, observed=y_observed)
    
    # Go!
    trace = pm.sample(2000, chains=4, tune=1000, return_inferencedata=True)

So I guess my question is: is it possible to declare priors using a loop in PyMC? I imagine I’m just overlooking some basic syntactic differences between Stan and PyMC. Thanks in advance for your help.

Hi Alex,

You’re 99% of the way there, you’re just missing some syntactic idiosyncrasies of PyMC. Specifically:

  1. As the error says, you can’t create new PyMC random variables in inner function of a scan. Instead, you have to make pytensor random variables, then tell PyMC to consider the entire sequence jointly as a single RV. To do this:

    • Use pm.Normal.dist instead of pm.Normal. This is how to get the underlying pytensor RV.
    • Use the collect_default_updates helper function to handle the seeding of the random number generator through the scan
    • Use pm.CustomDist to tell PyMC your whole scan sequence is a joint RV.
  2. Then, since you’re using a CustomDist, you have to split out the scan into a helper function that will be used to create a joint distribution over the sequence of hidden state(s).

Admittedly, it’s a lot of boilerplate. But together, it will look like this:

import pytensor
import pytensor.tensor as pt
import pymc as pm
from pymc.pytensorf import collect_default_updates
import numpy as np
import matplotlib.pyplot as plt
import arviz as az

# Generate artifical data
n_steps = 100
rng = np.random.default_rng(1234)
x0_true = rng.normal()
true_sigma = abs(rng.normal(scale=0.5))
true_irreg = abs(rng.normal(scale=0.5))
grw_innovations = rng.normal(scale=true_sigma, size=(100,))
data = rng.normal(np.r_[x0_true, grw_innovations].cumsum()[1:], true_irreg)


# Helper function for pm.CustomDist
def statespace_dist(mu_init, sigma_level, size):

    def grw_step(mu_tm1, sigma_level):
        mu_t = mu_tm1 + pm.Normal.dist(sigma=sigma_level)
        return mu_t, collect_default_updates(outputs=[mu_t])

    mu, updates = pytensor.scan(fn=grw_step, 
                                outputs_info=[{"initial": mu_init}],
                                non_sequences=[sigma_level], 
                                n_steps=n_steps,
                                name='statespace',
                                strict=True)

    return mu


# PyMC Model
coords = {'time':np.arange(n_steps)}

with pm.Model(coords=coords) as model:           
    y_data = pm.MutableData('y_data', data, dims=['time'])

    mu_init = pm.Normal('mu_init', mu=0, sigma=1)
    sigma_level = pm.HalfNormal('sigma_level', sigma=1)
    sigma_irreg = pm.HalfNormal('sigma_irreg', sigma=1)

    mu = pm.CustomDist('hidden_states', 
                          mu_init,
                          sigma_level,
                          dist=statespace_dist,
                          dims=['time'])
    y_hat = pm.Normal('y_hat', mu, sigma_irreg, observed=y_data, dims=['time'])    
    idata = pm.sample()

Plot results:

fig, ax = plt.subplots(figsize=(14, 4))
x_grid = coords['time']

mu_data = idata.posterior.hidden_states
mu_hdi = az.hdi(mu_data).hidden_states
ax.plot(x_grid, mu_data.mean(dim=['chain', 'draw']), lw=2)
ax.fill_between(x_grid, *mu_hdi.values.T, alpha=0.5)


post_data = idata.posterior_predictive.y_hat
hdi = az.hdi(post_data).y_hat
# ax.plot(x_grid, post_data.mean(dim=['chain', 'draw']))
ax.fill_between(x_grid, *hdi.values.T, alpha=0.25, color='tab:blue')
ax.plot(data, color='k', ls='--', lw=0.5)

It’s good to work through all this to get a deep understanding for how PyMC works under the hood. But if you just want ready-to-go statespace models, you can check out the statespace module of pymc-experimental.

For more examples of writing scan-based time series models though, you can check here and here.

2 Likes

Thanks so much for all these details Jesse, this worked great.

I’m gonna try to do these Stan conversions using the statespace module in pymc-experimental as well, seems like that will be a fun exercise. I’ll let you know if I run into any issues.

1 Like

So I’ve been working on expanding the above scan-based approach to a slightly more complicated model and have gotten myself stumped. Here’s the desired model:

\begin{aligned} y_t &= \mu_t + \varepsilon_t, \\ \mu_{t+1} &= \mu_t + \nu_t + \xi_t, \\ \nu_{t+1} &= \nu_t + \zeta_t, \\ \end{aligned} \ \text{where} \ \begin{aligned} \varepsilon_t &\sim NID(0, \sigma_\varepsilon^2), \\ \xi_t &\sim NID(0, \sigma_\xi^2), \\ \zeta_t &\sim NID(0, \sigma_\zeta^2). \end{aligned}

So we’ve added a new parameter v with its own drift, and added it to the drift model of \mu.

I thought I could achieve this using a scan that returns two vectors, one for \mu and one for v, since both variables need to keep track of their previous values. This seems fairly straightforward based on the scan docs, and I think I have done it. But I haven’t been able to get this model to sample. Here’s my code:

lags = 1
trials = len(dat)

def new_dist(x_init, v_init, sigma, sigma_v, size):
    
    def step(x_tm1, v_tm1, sigma, sigma_v):
        v = v_tm1 + pm.Normal.dist(sigma=sigma_v)
        x = x_tm1 + v_tm1 + pm.Normal.dist(sigma=sigma)
        return [x, v], collect_default_updates([x, v])
    
    ([x_vals, v_vals], updates) = pytensor.scan(
        fn=step,
        outputs_info = [dict(initial=x_init), dict(initial=v_init)],
        non_sequences=[sigma, sigma_v],
        n_steps=trials-lags,
        strict=True,
    )
    
    return x_vals 

with pm.Model() as m:
    
    # Priors
    sigma = pm.HalfNormal("sigma", sigma=.2)
    sigma_x = pm.HalfNormal("sigma_x", sigma=.2)
    sigma_v = pm.HalfNormal("sigma_v", sigma=.2)
    x_init = pm.Normal("x_init", mu = 0, sigma = 1)
    v_init = pm.Normal("v_init", mu = 0, sigma = 1)

    # Iteratively declare priors on the latent state at each timepoint
    x = pm.CustomDist(
        "new_dist",
        x_init,
        v_init,
        sigma_x,
        sigma_v,
        dist=new_dist
    )
    
    # Likelihood
    #y_hat = pm.Normal('y_hat', x, sigma, observed=dat['log_y'])   

Oddly, I’m able to sample from the prior with the above code and all looks fine, but only if I comment out the likelihood, as in the above code chunk. And when I uncomment the likelihood and try sample from the posterior it throws AssertionError.

I’ve looked at the scan documentation, some other examples of scan implementations, and the other scan-based state space models shared above, but remain confused by the necessary syntax here. Thanks in advance for your help!

You usually only need CustomDist for observed variables. In your case you can first draw the epsilons in batch and then pass them to a deterministic scan directly in the PyMC model.

Anyway being able to do prior predictive and not sampling should not be surprising. The first only needs to evaluate the expressions you defined, the other requires inverting them and deriving the density function. This part is not always easy for PyMC to guess.

In this case the problem is you have two variables x and v put only one is (and must be) returned, so PyMC would have to marginalize t instead of sampling it. Instead you could define 2 CustomDists, one for each sequence.

But again, in your case (unobserved variables) you can draw the epsilons first and then use them to construct the sequence which requires no guess on PyMC’s side. This should also be easier to sample since the sampler only has to figure out the innovations which are independent where’s the whole sequence is not.

2 Likes

I’m pretty sure you can avoid the problem of two returns by writing the model in matrix form. For your equations you have a state vector x_t = \begin{bmatrix} \mu_t \\ \nu_t \end{bmatrix} and transition matrix T = \begin{bmatrix} 1 & 1 \\ 0 & 1\end{bmatrix} and a state innovation vector \epsilon_t = \begin{bmatrix} \xi_t \\ \zeta_t \end{bmatrix} with \epsilon_t \sim N \left(0, \begin{bmatrix}\sigma^2_\xi & 0 \\ 0 & \sigma^2_\zeta\end{bmatrix} \right )

That should work out to x_{t+1} = Tx_t + \epsilon_t, then you add in the observation equation y_t = Zx_t + \eta_t where Z = \begin{bmatrix} 1 & 0\end{bmatrix} and \eta \sim N(0, \sigma^2_\eta) I renamed from your \varepsilon since I already used epsilon : )

I think this works, but I also remember running into some challenges when I tried it a while back, so I’m curious if you have any luck.

1 Like

Thanks @ricardoV94, this is super helpful. Can you say more about how to first draw the epsilons in batch? I’ve tried a few things but to no avail. For example, I tried the following, but it causes scan to throw the error TypeError: slice indices must be integers or None or have an __index__ method


lags = 1
trials = len(dat)

with pm.Model() as model_3_1:
    # Priors remain the same
    sigma = pm.HalfNormal("sigma", sigma=.2)
    sigma_x = pm.HalfNormal("sigma_x", sigma=.2)
    sigma_v = pm.HalfNormal("sigma_v", sigma=.2)
    x_init = pm.Normal("x_init", mu=0, sigma=1)
    v_init = pm.Normal("v_init", mu=0, sigma=1)
    
    # Pre-generate noise terms for x and v
    error_dist_x = pm.Normal.dist(mu = 0, sigma = sigma_x)
    error_dist_v = pm.Normal.dist(mu = 0, sigma = sigma_v)
    epsilon_x = pm.draw(error_dist_x, draws=trials)
    epsilon_v = pm.draw(error_dist_v, draws=trials)
    
    # Define the model structure
    def step(epsilon_x, epsilon_v, x_tm1, v_tm1):
        v = v_tm1 + epsilon_v
        x = x_tm1 + v_tm1 + epsilon_x
        return [x, v], collect_default_updates([x, v])
    
    # Create the state variables as deterministic, becacuse in PyMC you can't iteratively declare random variables. 
    # These are deterministic because we sampled the error terms before! So the following code doesn't directly involve any probability distributions.
    ([x_vec, v_vec], updates) = pytensor.scan(
        fn=step,
        sequences = [epsilon_x, epsilon_v],
        outputs_info = [dict(initial=x_init), dict(initial=v_init)],
        n_steps=trials,
        strict=True,
    )
    
    x_vec = pm.Deterministic("x_vec", x_vec)
    
    # Likelihood using the constructed x sequence
    y_hat = pm.Normal('y_hat', mu=x_vec, sigma=sigma, observed=dat['log_y'])

I’m sure I’m just misunderstanding something basic.

@jessegrabowski that makes a lot of sense. The experience of trying to do this with pytensor.scan has really driven home the value of all your work on PyMCStateSpace :sweat_smile:. I’m still finding it engaging trying to puzzle this out, but will be looking at doing these models in PyMCStateSpace for sure.

You have some errors in your last model. The biggest one is that you don’t use .dist anymore, since you’re just making plain old PyMC random variables outside of a scan. Here’s a fixed up version, with some comments:

time_steps = 100
coords = {'time':np.arange(time_steps, dtype='int'),
          'state': ['level', 'trend'],
          'obs_state':['data']}

with pm.Model(coords=coords) as model_3_1:
    y_data = pm.ConstantData('y_data', data.values, dims=['time', 'obs_state'])
    
    # Vectorized state parameters
    state_sigma = pm.HalfNormal("sigma", sigma=[0.1, 0.001], dims=['state'])
    state_init = pm.Normal("x_init", mu=0, sigma=1, dims=['state'])
    
    # Pre-define noise terms for x and v
    innovations = pm.Normal('innovations', sigma=state_sigma, dims=['time', 'state'])
    
    # State transition matrix, as mentioned in post above.
    T = pt.as_tensor_variable(np.array([[1.0, 1.0], [0.0, 1.0]]))
    
    # R is the "selection matrix", we'd need it if we had states with no innovations. I include it just
    # to show you the most general possible form. 
    R = pt.eye(2)
    
    # Note that you *don't* need collect_default_updates anymore, because you're not declaring new pytensor  RVs 
    # inside the scan inner function. That helper is only for this case. When everything is declared outside the
    # inner function, it all works well.
    def step(innovations, last_state, T, R):
        next_state = T @ last_state + R @ innovations
        return next_state
    
    # Included to show you what i'd do if you were really anti-linear algebra. I'd still use coords and dims,
    # and gather similar variables into arrays, i'd just pack and unpack them in the step
#     def step_no_matrix(innovations, last_state):
#         level, trend = last_state
#         level_innov, trend_innov = innovations
        
#         next_level = level + trend + level_innov
#         next_trend = trend + trend_innov
        
#         return pt.stack([next_level, next_trend])
    
    # Scan over the RANDOM VARIABLES we pre-defined for the hidden state innovations. They're still random, just declared 
    # before the scan begins.
    state_sequence, updates = pytensor.scan(
        fn=step,
        sequences = [innovations],
        outputs_info = [state_init],
        non_sequences=[T, R],
        strict=True,
    )
    # Z is the "design matrix", it picks out only the observed states from the hidden state matrix.
    # You could just use slicing, but this is how you'll see it written in textbooks.
    Z = pt.as_tensor_variable(np.array([[1., 0.0]]))
    
    # You are allowed to do broadcast matrix multiplication, but we need to pad out state_sequence to trigger the broadcasting (otherwise it just looks
    # like a normal matrix-matrix multiplication) then squeeze away the extra dimension.
    # Again, this is equivalent to state_sequence[:, 0], which is what you should actually do __in this case__.
    obs_mu = (Z @ state_sequence[..., None]).squeeze(-1)
    
    # Since we have measurement errors, we can just use the hidden states as the mean of a normal.
    obs_sigma = pm.Exponential('obs_sigma', 1)
    y_hat = pm.Normal('y_hat', mu=obs_mu, sigma=obs_sigma, observed=y_data, dims=['time', 'obs_state'])
    idata = pm.sample(init='jitter+adapt_diag_grad')

It samples very poorly. I generated some fake data using the statespace module here:

import pymc as pm
from pymc_experimental.statespace import structural as st
import pytensor
import pytensor.tensor as pt
import arviz as az
import numpy as np

seed = sum(map(ord, 'pymc statespace'))
rng = np.random.default_rng()

m = st.LevelTrendComponent(order=2, innovations_order=2)
m += st.MeasurementError()
ss_mod = m.build()

with pm.Model(coords=ss_mod.coords) as m:
    initial_trend = pm.Normal('initial_trend', dims=['trend_state'])
    sigma_trend = pm.HalfNormal('sigma_trend', dims=['trend_shock'])
    sigma_MeasurementError = pm.HalfNormal('sigma_MeasurementError', sigma=10)
    P0_diag = pm.HalfNormal('P0_diag', dims=['state'])
    P0 = pm.Deterministic('P0', pt.diag(P0_diag), dims=['state', 'state_aux'])    
    
    ss_mod.build_statespace_graph(data=np.full((100,1), np.nan))
    prior = pm.sample_prior_predictive(random_seed=rng)

idata = ss_mod.sample_unconditional_prior(prior, random_seed=rng)

I looked through the draws for one that looked interesting, i went with draw=1 because it was bendy with good measurement error:

data = idata.prior_observed.sel(chain=0, draw=1)
data.plot.line(x='time')

Untitled

So, estimation. The model basically can’t figure out anything about the innovation series. Here are the posterior estimates of the “innovations outside of scan” model. Notice the low ESS:

                 mean     sd  hdi_3%  hdi_97%  mcse_mean  mcse_sd  ess_bulk  ess_tail  r_hat
sigma[level]    0.127  0.079   0.023    0.276      0.020    0.014      13.0      41.0   1.24
sigma[trend]    0.001  0.000   0.000    0.002      0.000    0.000      14.0      82.0   1.22
x_init[level]   1.054  1.036  -0.798    3.039      0.038    0.027     770.0     569.0   1.01
x_init[trend]   2.676  0.062   2.553    2.789      0.002    0.002     822.0    1331.0   1.00
obs_sigma      34.201  2.040  30.410   38.117      0.113    0.082     369.0     354.0   1.02

Compared to the true parameter values:

ref_vals = prior.prior.sel(chain=0, draw=1)

az.plot_posterior(idata, 
                  var_names=['sigma', 'x_init', 'obs_sigma'], 
                  ref_val={'sigma':[{'state':'level', 'ref_val':ref_vals['sigma_trend'].sel(trend_shock='level').values.item()},
                                    {'state':'trend', 'ref_val':ref_vals['sigma_trend'].sel(trend_shock='trend').values.item()}],
                          'x_init':[{'state':'level', 'ref_val':ref_vals['initial_trend'].sel(trend_state='level').values.item()},
                                    {'state':'trend', 'ref_val':ref_vals['initial_trend'].sel(trend_state='trend').values.item()}],
                          'obs_sigma': [{'ref_val':ref_vals['sigma_MeasurementError'].values.item()}]});

Compared to the data series:

import matplotlib.pyplot as plt
mu = idata.posterior_predictive.y_hat.mean(dim=['chain', 'draw'])
hdi = az.hdi(idata.posterior_predictive.y_hat).y_hat

fig, ax = plt.subplots(figsize=(14,4), dpi=144)
x_grid = coords['time']
ax.plot(x_grid, data.values)
ax.plot(x_grid, mu)
ax.fill_between(x_grid, *hdi.values.squeeze().T, alpha=0.25, color='tab:orange')

This was my experience in the past with the “sample outside the scan” approach. I might be doing something wrong, but in general I think it’s quite important that you grapple with the CustomDist stuff to make time series models work.

1 Like

This is super interesting and helpful, thanks for all these details Jesse.