Does the state_space model support time-dependent transition matrices?

I’m relatively new to pyMC, and I’m trying to implement a state space model whose transition matrix depends on time. So far, I’ve been following the example in this notebook. It does almost everything I need, except for the time-dependence. Is there an easy way to modify it to add a time-varying transition matrix?

In case it matters, the time-dependence I need is just a deterministic function of t, not some complicated stochastic process. The actual system I want to implement is quite messy, but it would suffice to know how to implement something like:

alpha = pm.Normal(“alpha”, mu=0, sigma=10)
beta = pm.Normal(“beta”, mu=0, sigma=10)
transition = [[alpha*t + beta*(T_final-t), 0],[0,1]]

Yes, all matrices except x0 and P0 are allowed to be time varying. This discussion goes through some of the implementation details.

Thanks for the link. Unfortunately, I’m not fluent enough with pyMC to figure out how to modify that example for my purposes.

I’ve put together a toy version of what I want to do. It’s a difference equation where one of the rate parameters fluctuates as a function of time, with some mean and variance that I would like to be able to estimate. Is this anywhere close to correct? I can’t for the life of my figure out what to put in the sections marked ???, and I suspect there are other problems besides.

import numpy as np
import matplotlib.pyplot as plt
import arviz as az
from pymc_extras.statespace.core.statespace import PyMCStateSpace
import pytensor.tensor as pt
import pymc as pm

# Stochastic transfer model

# x0' = 1 - d_rand*x0
# x1' = d_rand*x0 - x1
# d_rand ~ Logit-normal

####################
# useful functions #
####################

def sig(x):
   # sigmoid function, maps R to [0,1]
   return 1/(1+np.exp(-1*x))

#####################################
# Set up true model / generate data #
#####################################

# true model parameters
mu_true     = 2
sigma_true  = 1

# initial conditions
x_0_init = 2
x_1_init = 0

# simulation parameters
T_init   = 0
T_fin    = 10
dt       = 0.1
N        = int(np.ceil( (T_fin - T_init)/dt ))
time     = np.linspace(T_init, T_fin, N)

x_0_true = [x_0_init]
x_1_true = [x_1_init]

for t in time[1:]:
   x0 = x_0_true[-1]
   x1 = x_1_true[-1]
   d_rand = sig(np.random.normal(mu_true, sigma_true))

   x_0_true += [x0 + dt*(1 - d_rand*x0)]
   x_1_true += [x1 + dt*(d_rand*x0 - x1)]

# plot results
plt.plot(time, x_0_true)
plt.plot(time, x_1_true)
plt.xlabel('time')
plt.show()

###############################
# Set up the stochastic model #
###############################

class testModel(PyMCStateSpace):
   def __init__(self):
      k_states = 2  # size of the state vector x
      k_posdef = 1  # number of shocks (size of the state covariance matrix Q)
      k_endog = 1  # number of observed states
      super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

   def make_symbolic_graph(self):
      innit = self.make_and_register_variable("innit", shape=(2,))
      mu_param = self.make_and_register_variable("mu_param", shape=())
      sigma_param = self.make_and_register_variable("sigma_param", shape=())
      h = self.make_and_register_variable('h', shape=(None, self.k_endog))

      self.ssm["design", 0, 1] = 1 # fit to "x_1_true"
      self.ssm["state_intercept", 0] = 1
      self.ssm["initial_state", :] = innit
      self.ssm["transition", 0, 0] = ????
      self.ssm["transition", 0, 1] = ????
      self.ssm["transition", 1, 1] = -1

   @property
   def param_names(self):
      return ["innit", "mu_param", "sigma_param", "h"]

#################
# Fit the model #
#################

tm = testModel()

with pm.Model() as pymc_mod:
   innit       = pm.Normal("innit", mu=3, sigma=0.25, shape=(2,))
   mu_param    = pm.Normal("mu_param", mu=0, sigma=1)
   sigma_param = pm.Normal("sigma_param", mu=0, sigma=1)
   h           = pm.Normal("h", mu=mu_param, sigma=sigma_param, shape=(N,))

   tm.build_statespace_graph(data=np.asmatrix(x_1_true).T, mode="JAX")
   idata = pm.sample(
      nuts_sampler="nutpie",
      nuts_sampler_kwargs={"backend": "jax", "gradient_backend": "jax"}
   )

az.plot_trace(idata, var_names=["innit", "mu_param", "sigma_param", "h"])
plt.show()

The model you are writing has to be in the form:

\begin{align} x_t & = T_t x_{t-1} + c_t + R_t \varepsilon_t, & \varepsilon_t \sim MVN(0, Q_t) \\ y_t &= Z_t x_t + d_t + \eta_t, & \eta_t \sim MVN(0, H_t) \end{align}

I don’t know your model, and the 3 lines of comments I see aren’t very clear. I think a good first step for you will be to determine what are the matrices c, d, T, Z, R, H, Q in your case. See here for more comments on the framework.

In this example, c = \begin{bmatrix} \Delta t \\ 0 \end{bmatrix}, Z = \begin{bmatrix} 0 &1 \end{bmatrix}, and T=\begin{bmatrix} -d_{rand}\cdot\Delta t & 0 \\ d_{rand}\cdot\Delta t & -1 \end{bmatrix}. All other matrices are zero. (Aside: In the past, I’ve gotten an error message whenever I try to set
k_posdef = 0, so I set it to 1 even though there are no shocks.) \Delta t is a constant (a fixed time increment) and d_{rand} is a rate constant that I want to fluctuate with time. d_{rand} has to remain between 0 and 1, so I’m using a Logit-normal (which I’ve since noticed is already implemented, so you can ignore my clumsy attempts to build it out of a normal distribution).

The real model I care about is an epidemiological model with a fluctuating diagnosis rate. So if it helps, you can think of x_0 as the undiagnosed state, x_1 as the diagnosed state, and d_{rand} as the diagnosis rate. The important thing is that the source of randomness is not measurement noise (we know how many diagnoses there have been to high accuracy), but rather randomness intrinsic to one of the model’s transition rates.

Here’s a slightly less bone-headed version, with more consistent names and without the clumsy Logic-normal stuff.

###############################
# Set up the stochastic model #
###############################

class testModel(PyMCStateSpace):
   def __init__(self):
      k_states = 2  # size of the state vector x
      k_posdef = 1  # number of shocks (size of the state covariance matrix Q)
      k_endog = 1  # number of observed states
      super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

   def make_symbolic_graph(self):
      innit = self.make_and_register_variable("innit", shape=(2,))
      d_rand = self.make_and_register_variable('d_rand', shape=(None, self.k_endog))

      self.ssm["initial_state", :] = innit

      self.ssm["design", 0, 1] = 1 # fit to "x_1_true"

      self.ssm["state_intercept", 0] = dt # single source term for x_0

      self.ssm["transition", 0, 0] = -dt*d_rand # tarnsfer from x_0 to x_1
      self.ssm["transition", 0, 1] = dt*d_rand # tarnsfer from x_0 to x_1

      self.ssm["transition", 1, 1] = -1 # exponential decay term for x_1

   @property
   def param_names(self):
      return ["innit", "d_rand"]

#################
# Fit the model #
#################

tm = testModel()

with pm.Model() as pymc_mod:
   innit       = pm.Normal("innit", mu=3, sigma=0.25, shape=(2,))
   d_rand      = pm.LogitNormal("d_rand", mu=0, sigma=1, shape=(N,))

   tm.build_statespace_graph(data=np.asmatrix(x_1_true).T, mode="JAX")
   idata = pm.sample(
      nuts_sampler="nutpie",
      nuts_sampler_kwargs={"backend": "jax", "gradient_backend": "jax"}
   )

az.plot_trace(idata, var_names=["innit", "d_rand"])
plt.show()

When it tries to evaluate the “self.ssm[“transition”, 0, 0] -= dt*d_rand” line, I get the following error:

TypeError: Trying to increment a 0-dimensional subtensor with a 2-dimensional value.

Presumably because self.ssm[“transition”, 0, 0] is expecting a scalar, not a vector, but I can’t figure out the correct notation.

I think I get it now. Thanks for clarifying the setup!

The index ["transition", 0, 0] is a single element in a matrix, yeah. When you registered d_rand, you said it has shape (None, 2). From your equations, it looks like d_rand should just be a scalar, so the shape should be (None, 1) or (None,), depending on how pedantic you want to be. To get the time dimension in there, you need to explicitly index the time dimension on transition, e.g. self.ssm["transition", :, 0, 0] = d_rand[:, 0] to set a sequence of transition matrices to the first element of the d_rand vector at every timestep.

I also recommend you pass dt in the __init__ method, save it in self.dt, and use this in the make_symbolic_graph method.

Thanks! That’s exactly what I was looking for.

Unfortunately, now I’m getting a new, different error message (progress!), which I assume is caused by me incorrectly assigning a shape somewhere. But just in case it’s an actual bug, and not just me doing things wrong, here’s the error.

 idata = pm.sample( call last):
            ^^^^^^^^^^\Documents\Programming\pyMC\stochastic_rate.py", line 102, in <module>
  File "C:\Users\J\miniconda3\envs\pymc_env\Lib\site-packages\pymc\sampling\mcmc.py", line 781, in sample
    return _sample_external_nuts(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\J\miniconda3\envs\pymc_env\Lib\site-packages\pymc\sampling\mcmc.py", line 342, in _sample_external_nuts
    idata = nutpie.sample(
            ^^^^^^^^^^^^^^
  File "C:\Users\J\miniconda3\envs\pymc_env\Lib\site-packages\nutpie\sample.py", line 636, in sample
    result = sampler.wait()
             ^^^^^^^^^^^^^^
  File "C:\Users\J\miniconda3\envs\pymc_env\Lib\site-packages\nutpie\sample.py", line 388, in wait
    self._sampler.wait(timeout)
RuntimeError: All initialization points failed

Caused by:
    Logp function returned error: Python error: ValueError: Invalid shape: Expected (1,) but got (100,)

The (100,) is coming from the length of the time series, so it’s either associated to the data or to d_rand.

1 Like

Does pm.sample_prior_predictive() work? Also check what model.debug() says.

My guess is that it’s something to do with broadcasting. Can you share the model code?

Edit: Looking at the code in the post above, your model still have several problems:

  • You need to define P0, the initial covariance matrix over the hidden states (to be estimated via Kalman filtering)
  • There are no sources of stochastic variation in your model. This is either hidden state innovations (defined via Q) or measurement errors (defined via H). If you have neither, the KF is degenerate (called a “stochastic singularity”).
  • The way I wrote things, it’s necessary to pass a complete time-varying matrix to self.ssm, rather than using the [:, 0, 0] notation.

A corrected model looks as follows. I also implemented all the properties and coords, which are required for doing post-estimation stuff:

from pymc_extras.statespace.core import PyMCStateSpace
import pytensor.tensor as pt
import pytensor
from pymc.model.transform.optimization import freeze_dims_and_data
from pymc_extras.statespace.models.utilities import make_default_coords

###############################
# Set up the stochastic model #
###############################

class testModel(PyMCStateSpace):
    def __init__(self, dt=1, n_timesteps=100):
        k_states = 2  # size of the state vector x
        k_posdef = 0  # number of shocks (size of the state covariance matrix Q)
        k_endog = 1  # number of observed states
        
        self.dt = dt
        self.n_timesteps = n_timesteps
            
        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)
        
        
    def make_symbolic_graph(self):
        innit = self.make_and_register_variable("innit", shape=(self.k_states,))
        d_rand = self.make_and_register_variable('d_rand', shape=(self.n_timesteps, self.k_endog))
        measurement_error = self.make_and_register_variable('measurement_error', shape=(self.k_endog,))
        
        P0 = self.make_and_register_variable('P0', shape=(self.k_states, self.k_states))

        self.ssm["initial_state", :] = innit
        self.ssm['initial_state_cov'] = P0

        self.ssm["design", 0, 1] = 1 # fit to "x_1_true"

        self.ssm["state_intercept", 0] = self.dt # single source term for x_0
        
        T = pt.zeros((self.n_timesteps, self.k_states, self.k_states))
        T = T[:, 0, 0].set(-self.dt * d_rand[:, 0]) # transfer from x_0 to x_0
        T = T[:, 0, 1].set(self.dt * d_rand[:, 0]) # transfer from x_0 to x_1
        T = T[:, 1, 1].set(-1) # exponential decay term for x_1
        
        self.ssm['transition'] = T
                
        self.ssm['obs_cov', 0, 0] = measurement_error[0]

    @property
    def param_names(self):
        return ["innit", "d_rand", "measurement_error", "P0"]
    
    @property
    def state_names(self):
        return ['x1', 'x2']
    
    @property
    def shock_names(self):
        return []
    
    @property
    def observed_states(self):
        return ['x1']
    
    @property
    def coords(self):
        return make_default_coords(self)

This model will fit, but doing so is hopeless unless you impose some structure on the sequence of d_rand. Otherwise each parameter is informed by only 1 observation. In all likelihood you will just get the priors back.

Oh, wait… is d_rand, as currently coded, a vector of several independent random variables, each of which has a prior that can update independently? I want them all to share the same distribution.

What would it mean for them to “share the same distribution”? If you want them to change over time, of course they all have to have their own prior, otherwise there’s nothing to update. You could make them hierarchical by adding a learnable mu/sigma – that would match you did in your numpy data generation code, and would help by allowing information sharing between the time steps. You would likely be able to learn the hyperpriors, but nothing about the actual time dynamics (though I might be surprised). Or you could think about a single distribution generating the •sequence* of values, such as a random walk.

Like rolling the same die multiple times. That would give you a sequence of numbers that change from one roll to the next, but they’re all drawn from the same probability distribution.

That’s exactly what I want: a sequence. For each sample of the state space model, I want to draw N samples from a single distribution and arrange them into a sequence to form d_rand.

I think you’re confusing two things. Your model is not as simple as rolling a dice, so I need to expand the example a bit to get at what is going on. You have a dice, which you do not observe, and two coins, one which is biased towards heads, and one which is biased towards tails. You roll the dice. On an odd number, you then flip the heads-biased coin and record the result. On a even number, you flip the tails-biased coin and record the result. You observe the sequence H H H T T H T H H T T H.

A model now needs to learn:

  1. The distribution over faces of the dice (your focus is here)
  2. The distribution over the actual (but unobserved!) outcomes of the dice roll associated with each coin flip (I’m trying to draw your attention here)
  3. The probabilities associated with each weighted coin

For any sequence, you must learn all of these things jointly. You can’t just do (1) then stop, it doesn’t make sense. Right now, you’re not learning (1) at all, because there is no joint structure in the prior for d_rand, they’re just iid Normal(0, 1). Alternatively, you could think of this as a dogmatic prior – you’re insisting that the shared structure is mean 0, sigma 1. You could, however, link them together with shared hyperpriors, for example:

with pm.Model() as m:
    d_rand_loc = pm.Normal('d_rand_loc', 0, 3)
    d_rand_scale = pm.Exponential('d_rand_scale', 1)
    d_rand = pm.LogitNormal('d_rand', mu=d_rand_loc, sigma=d_rand_scale, dims=['time', 'state'])

Now you’re learning the equivalent of (1) – the location and scale parameters are the common distribution shared by all the realizations. But you still need to update your beliefs about the sequence realizations – this is the purpose of d_rand itself.

1 Like

Okay, you’ve convinced me. For now, I will go back to a more conventional approach, namely treating the transition rates as constant and moving the stochasticity to the obs_cov and state_cov terms.

Unfortunately, I’m still getting hung up on the basic syntax. Here’s another example, based on the forward Euler discretization of a forced spring with damping (x is position, v is velocity). The time-varying component here is the forcing term, which should show up in state_intercept. There are five versions, trying various combinations of the syntax you provided. I can only get the constant-forcing solution to work.

import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
import arviz as az
from pymc_extras.statespace.core.statespace import PyMCStateSpace
import pytensor.tensor as pt
import pymc as pm

# Damped Spring model

# x'' + 2ax' + (w^2)x = f

##############################
# Set up true model and data #
##############################

# choose versions 0-5
VERSION = 0

# true model parameters
a_true      = 0.3
w_true      = 0.7
sigma_true  = 0.03

# initial conditions
x_init = 2
v_init = 0.5

# simulation parameters
T_init   = 0
T_fin    = 10
dt       = 0.1
N        = int(np.ceil( (T_fin - T_init)/dt ))
time     = np.arange(T_init, T_fin, dt)

def forcing_fun(t):
   if VERSION in [0, 1, 2]:
      return 1
   if VERSION in [3, 4, 5]:
      return np.sin(t)

def exact_model(y, t):
   x = y[0]
   v = y[1]

   # the model equations
   dxdt = v
   dvdt = -2*a_true*v - w_true**2*x + forcing_fun(t)

   return [dxdt, dvdt]

# solve ODE
y = odeint(exact_model, [x_init, v_init], time)

x_true = y[:,0] + np.random.normal(0, sigma_true, size=N)
v_true = y[:,1] + np.random.normal(0, sigma_true, size=N)

# plot results
plt.plot(time, x_true)
plt.plot(time, v_true)
plt.xlabel('time')
plt.show()

###############################
# Set up the stochastic model #
###############################

class SpringModel(PyMCStateSpace):
   def __init__(self, dt=1, n_timesteps=100):
      # defines the "shape" of the matrices & vectors
      k_states = 2  # size of the state vector x
      k_posdef = 0  # number of shocks (size of the state covariance matrix Q)
      k_endog = 1  # number of observed states

      self.dt = dt
      self.n_timesteps = n_timesteps

      super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

   def make_symbolic_graph(self):
      # sets the matrix & vector values

      x0 = self.make_and_register_variable("x0", shape=(2,))
      ar_params = self.make_and_register_variable("ar_params", shape=(2,)) # [w, a]
      sigma_x = self.make_and_register_variable("sigma_x", shape=())

      self.ssm["design", 0, 0] = 1
      self.ssm["obs_cov", 0, 0] = sigma_x**2

      self.ssm["initial_state", :] = x0

      self.ssm["transition", :, :] = np.eye(2)
      self.ssm["transition", 0, 1] += self.dt
      self.ssm["transition", 1, 0] -= self.dt*ar_params[0]**2
      self.ssm["transition", 1, 1] -= self.dt*2*ar_params[1]

      ##############
      # This works #
      ##############

      # version 0: constant forcing
      if VERSION == 0:
         self.ssm["state_intercept", 1] = self.dt

      # version 1: constant forcing, with time index included
      if VERSION == 1:
         self.ssm["state_intercept", :, 1] = self.dt

      ################
      # This doesn't #
      ################

      # version 2: explicitly declaring the size of the time dimension

      if VERSION == 2:
         S_I = pt.zeros((self.n_timesteps, self.k_states))
         S_I = S_I[:, 1].set(self.dt)
         self.ssm["state_intercept"] = S_I

      # versions 3 and 4 introduce a new variable to be used when the forcing term isn't constant

      if VERSION == 3:
         forcing = self.make_and_register_variable("forcing", shape=(self.n_timesteps,))
         self.ssm["state_intercept", :, 1] = self.dt*forcing

      if VERSION == 4:
         forcing = self.make_and_register_variable("forcing", shape=(self.n_timesteps,))
         S_I = pt.zeros((self.n_timesteps, self.k_states))
         S_I = S_I[:, 1].set(self.dt*forcing)
         self.ssm["state_intercept"] = S_I

      # version 5: skipping the deterministic variable as an intermediary
      if VERSION == 5:
         self.ssm["state_intercept", :, 1] = self.dt*pt.as_tensor_variable(forcing_fun(time))


   # All parameter names created with "make_and_register_variable" must
   # be registered in a class property called param_names.
   @property
   def param_names(self):
      if VERSION in [0, 1, 2, 5]:
         return ["x0", "ar_params", "sigma_x"]
      if VERSION in [3, 4]:
         return ["x0", "ar_params", "sigma_x", "forcing"]

#################
# Fit the model #
#################

sm = SpringModel(dt=dt, n_timesteps=N)

with pm.Model() as pymc_mod:
   x0 = pm.Normal("x0", mu=3, sigma=0.25, shape=(2,))
   ar_params = pm.Gamma("ar_params", alpha=2, beta=0.5, shape=(2,))
   sigma_x = pm.Exponential("sigma_x", 1)

   if VERSION in [3, 4]:
      forcing = pm.Deterministic("forcing", pt.as_tensor_variable(forcing_fun(time)))

   sm.build_statespace_graph(data=np.asmatrix(x_true).T, mode="JAX")
   idata = pm.sample(
      nuts_sampler="nutpie",
      nuts_sampler_kwargs={"backend": "jax", "gradient_backend": "jax"}
   )

az.plot_trace(idata, var_names=["x0", "ar_params", "sigma_x"])
plt.show()

az.plot_posterior(
   idata, var_names=["x0", "ar_params", "sigma_x"], ref_val=[x_init, v_init, w_true, a_true, sigma_true]
)
plt.show()

Versions 2 and 4 first throw a warning:

UserWarning: Skipping CheckAndRaise Op (assertion: The first dimension of a time varying matrix (the time dimension) must be equal to the first dimension of the data (the time dimension).) as JAX tracing would remove it.

Before hanging for a long time and then finally crashing with:

RuntimeError: All initialization points failed

Caused by:
Logp function returned error: Python error: IndexError: Array slice indices must have static start/stop/step to be used with NumPy indexing syntax. Found slice(None, Traced<ShapedArray(int8)>with, None). To index a statically sized array at a dynamic position, try lax.dynamic_slice/dynamic_update_slice (JAX does not support dynamically sized arrays within JIT compiled functions).

Versions 3 and 5 seem most promising, in that the error looks like it’s just a matter of me having specified a shape/formatted a matrix incorrectly somewhere. Unfortunately, I’m unable to locate the problem.

ERROR (pytensor.graph.rewriting.basic): Rewrite failure due to: constant_folding
ERROR (pytensor.graph.rewriting.basic): node: SpecifyShape([ 0. … .45753589], 1)
ERROR (pytensor.graph.rewriting.basic): TRACEBACK:
ERROR (pytensor.graph.rewriting.basic): Traceback (most recent call last):

(lots of text)

AssertionError: SpecifyShape: dim 0 of input has shape 100, expected 1.

Any advice on how to fix this would be appreciated.

It seems like you can get what you want with clever state design, rather than time varying matrices. The state_intercept can always be subsumed into the state vector. Say you have a system x_t = Tx_{t-1} and you want to add a constant factor f at every timestep, you can define:

\begin{align} x_{2,t} &= \begin{bmatrix} x_t \\ f \end{bmatrix} \\ T_2 &= \begin{bmatrix} T & 1 \\ 0_{1 \times n} & 1 \end{bmatrix} \end{align}

Multiply out to get T_2 x_{2,t} = T x_t + f

This is totally equivalent, but the power is that by thinking about the constant as a “state”, you can start to build in time-varying dynamics that allow it to change as a function of time. For example, here it is given a deterministic trend with “velocity” \Delta t:

\begin{align} x_{3,t} &= \begin{bmatrix} x_t \\ f_t \\ \Delta t \end{bmatrix} \\ T_3 &= \begin{bmatrix} T & 1 & 0 \\ 0_{1 \times n} & 1 & 1 \\ 0_{1 \times n} & 0 & 1 \end{bmatrix} \end{align}

So that now the “intercept” f_t has a transition process f_t = f_{t-1} + \Delta t, achieving the time-varying forcing term. You didn’t have this case, but it shows how you can handle the process. If you want f_t = \sin(t), you can use fourier bases in the same way. You will need two states, f_t and f^\star_t with the following transition sub-matrix:

S = \begin{bmatrix} \cos(1) & \sin(1) \\ -\sin(1) & \cos(1) \end{bmatrix}

(the ones arise because you want fourier bases with period s=2 \pi, and the formula is \frac{2\pi j}{s}, where j is the number of basis vectors you’re generting (just 1 in this case). The full transition matrix will just be:

T_4 = \begin{bmatrix} T & 1_{1 \times 2} \\ 0_{2 \times n} & S \end{bmatrix}

And x_{4,t} = \begin{bmatrix} x_t \\ f_t \\ f^\star_t \end{bmatrix}, with f_0 = -0.5 and f^\star_0 = 0.5

A broader point is that we don’t really have code to do EKF. It wouldn’t be super hard, and we could even compute the jacobians of the system for the user. As things stand you can do it by hand (which is basically what you’re doing here). But the main pain will come from the predict function, which will still use Tx_t. In this context, that should be replaced with the RK4 approximation. I guess you can mitigate this with small \Delta t for the moment.

Does that mean I can’t specify the time-varying component of state_intercept (or tarnsition, or any of these other matrices) using an array of numbers? I figured I was just using the wrong syntax. All I want to do at this point is figure out how to write the right-hand side of:

v = [v_0, v_1, ..., v_T]

self.ssm["transition", :, 0, 1] = something involving v

I thought pt.as_tensor_variable(v) would be the right thing, but apparently not.

What does “with an array of numbers” mean? You can certainly plug in constant values, it is done all the time to put in ones and zeros.

As you saw above, the current implementation requires you to pass the full time-varying matrix when you assign it to ssm. In your example, you should either:

  1. First do self.ssm["transition"] = pt.zeros((T, k, k)), then use the slicing syntax
  2. Build T first (as I showed above) then do self.ssm["transition"] = T.

The point of my last post was to illustrate that you can get what you want in a computationally more effective way.

I’ve tried various versions of that with all kinds of minor tweaks to the syntax, but every time I do, I keep getting the same error ending with

RuntimeError: All initialization points failed
Caused by:
Logp function returned error: Python error: IndexError: Array slice indices must have static start/stop/step to be used with NumPy indexing syntax. Found slice(None, Traced<ShapedArray(int8)>with<DynamicJaxprTrace(level=1/0)>, None). To index a statically sized array at a dynamic position, try lax.dynamic_slice/dynamic_update_slice (JAX does not support dynamically sized arrays within JIT compiled functions).
thread ‘nutpie-worker-2’ panicked at D:\bld\nutpie_1722020327210_build_env.cargo\registry\src\index.crates.io-6f17d22bba15001f\nuts-rs-0.12.1\src\sampler.rs:576:18:
Could not send sampling results to main thread.: SendError { … }
note: run with RUST_BACKTRACE=1 environment variable to display a backtrace

If I remove the JAX stuff and leave the settings on their defaults, then I get a different error ending with

File “C:\Users-\AppData\Roaming\Python\Python312\site-packages\pytensor\link\numba\dispatch\basic.py”, line 358, in use_optimized_cheap_pass
old_pm = context._mpm_cheap
^^^^^^^^^^^^^^^^^^
AttributeError: ‘JITCPUCodegen’ object has no attribute ‘_mpm_cheap’

The following model runs for me without issue:

import numpy as np
import pandas as pd
import pytensor
import pytensor.tensor as pt
import pymc as pm
from pymc.model.transform.optimization import freeze_dims_and_data
from pymc_extras.statespace.models.utilities import make_default_coords
from pymc_extras.statespace.core import PyMCStateSpace

class SpringModel(PyMCStateSpace):
    def __init__(self, dt=1, n_timesteps=100, estimate_force=False):
        # defines the "shape" of the matrices & vectors
        k_states = 2  # size of the state vector x
        k_posdef = 0  # number of shocks (size of the state covariance matrix Q)
        k_endog = 1  # number of observed states

        self.dt = dt
        self.n_timesteps = n_timesteps
        self.estimate_force = estimate_force

        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

    def make_symbolic_graph(self):
        # sets the matrix & vector values

        x0 = self.make_and_register_variable("x0", shape=(2,))
        P0 = self.make_and_register_variable('P0', shape=(2, 2))
        
        self.ssm['initial_state_cov'] = P0
        
        ar_params = self.make_and_register_variable("ar_params", shape=(2,)) # [w, a]
        sigma_x = self.make_and_register_variable("sigma_x", shape=())
        

        self.ssm["design", 0, 0] = 1
        self.ssm["obs_cov", 0, 0] = sigma_x**2

        self.ssm["initial_state", :] = x0

        self.ssm["transition", :, :] = np.eye(2)
        self.ssm["transition", 0, 1] += self.dt
        self.ssm["transition", 1, 0] -= self.dt*ar_params[0]**2
        self.ssm["transition", 1, 1] -= self.dt*2*ar_params[1]

        # version 2: explicitly declaring the size of the time dimension
        S_I = pt.zeros((self.n_timesteps, self.k_states))
        
        if self.estimate_force:
            forcing = self.make_and_register_variable("forcing", shape=(self.n_timesteps,))
            S_I = S_I[:, 1].set(self.dt*forcing)
        
        else:
            S_I = S_I[:, 1].set(self.dt)
        
        self.ssm["state_intercept"] = S_I
        
    @property
    def param_names(self):
        params = ["x0", "P0", "ar_params", "sigma_x"]
        if self.estimate_force:
            params += ['forcing']
        
        return params
    
    @property
    def state_names(self):
        return ['x1', 'x2']
    
    @property
    def shock_names(self):
        return []
    
    @property
    def observed_states(self):
        return ['x1']
    
    @property
    def coords(self):
        return make_default_coords(self)



dt = 0.1
N = 100
data = pd.DataFrame(np.nan, index=pd.RangeIndex(N), columns=['x1'])
sm = SpringModel(dt=dt, n_timesteps=N, estimate_force=True)

with pm.Model() as pymc_mod:
    x0 = pm.Normal("x0", mu=3, sigma=0.25, shape=(2,))
    P0_diag = pm.Gamma('P0_diag', alpha=2, beta=1, shape=(2,))
    P0 = pm.Deterministic('P0', pt.diag(P0_diag))
    
    ar_params = pm.Gamma("ar_params", alpha=2, beta=0.5, shape=(2,))
    sigma_x = pm.Exponential("sigma_x", 1)

    # Random walk prior on forcing term
    forcing = pm.Normal('forcing', 0, 0.1, shape=(N,)).cumsum()

    sm.build_statespace_graph(data=data, mode="JAX")
    idata = pm.sample(
        nuts_sampler="nutpie",
        nuts_sampler_kwargs={"backend": "jax", "gradient_backend": "jax"}
    )

Your first error is a sampling error. It is likely because you did not define P0, so the kalman filter estimate covaraince was being initialized as the zero matrix.

The second error is a numba error. It was reported here. You can fix it by downgrading to numba==0.60.0 or waiting a couple days until we increase the version pin to pytensor on pymc (it’s already fixed in pytensor).

1 Like

Excellent. Thanks so much for taking the time to help me :slight_smile:

When I try to run that, I get the same error message. This is on a fresh reinstall, which works with the other, non-time-varying state space models I’ve built. I assume that means it’s an actual bug (which at this point would honestly be a relief).

If it’s a bug, it’s a bug with jax windows, which is considered experimental by Google. The recommended way to use jax on a windows machine is via WSL