How to define time-varying state_intercept (Ct) in pymc-extras SSM?

Hi everyone,

I would like to build a state-space model using pymc-extras.
To simplify the question, here is a toy example where the latent state x_t depends on:
image

  • A and B are parameters to be estimated.
  • 𝑃(𝑡) is a known, time-varying input.

How can I define such a time-varying state_intercept C_t (B*𝑃(𝑡) in my example) in pymc-extras?

The real case is to build a two-state model. I wrote the code below (but got stuck on the part involving the commented-out state-intercept matrix). The time-varying input 𝑃(𝑡) is named PertSize.

class TwoState1SubjectModel(PyMCStateSpace):
def init(self, PertSize):
k_states = 3 # size of the state vector x
k_posdef = 3 # number of shocks (size of the state covariance matrix Q)
k_endog = 1 # number of observed states
self.PertSize = pt.as_tensor_variable(PertSize)
super().init(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

def make_symbolic_graph(self):
    # Declare symbolic variables that represent parameters of the model
    x0 = self.make_and_register_variable("x0", shape=(3,))
    P0 = self.make_and_register_variable("P0", shape=(3, 3))
    Af = self.make_and_register_variable("Af", shape=()) #Afast, Aslow
    As = self.make_and_register_variable("As", shape=())
    Bf = self.make_and_register_variable("Bf", shape=()) #Bfast, Bslow
    Bs = self.make_and_register_variable("Bs", shape=())
    sigma_nu_fast = self.make_and_register_variable("sigma_nu_fast", shape=()) #sigma_fast
    sigma_nu_slow = self.make_and_register_variable("sigma_nu_slow", shape=()) #sigma_slow
    sigma_eta = self.make_and_register_variable("sigma_eta", shape=()) #sigma_y
    # PertSize = pm.Data("PertSize", PertSize)
    
    # Next, use these symbolic variables to build the statespace matrices by assigning each parameter
    # to its correct location in the correct matrix
    self.ssm["transition", :, :] = pt.stack([
        pt.stack([Af + Bf, 0.0, 0.0]),
        pt.stack([0.0, As + Bs, 0.0]),
        pt.stack([1.0, 1.0, 0.0])
    ])
    self.ssm["selection", :, :] = pt.stack([
    pt.stack([1.0, 0.0, Bf]),
    pt.stack([1.0, 0.0, Bs]),
    pt.stack([0.0, 0.0, 1.0])
    ])
    
    # self.ssm["state_intercept", :, :] = pt.stack([
    #     Bf * self.PertSize,
    #     Bs * self.PertSize,
    #     self.PertSize
    # ], axis=1)  # shape (n_timesteps, 3)

    self.ssm["design", :, :] = np.array([[0, 0, 1]])
    self.ssm["initial_state", :] = x0
    self.ssm["initial_state_cov", :, :] = P0
    self.ssm["state_cov", 0, 0] = sigma_nu_fast
    self.ssm["state_cov", 1, 1] = sigma_nu_slow
    self.ssm["state_cov", 2, 2] = sigma_eta
@property
def param_names(self):
        return ["x0", "P0", "Af", "As", "Bf", "Bs", "sigma_nu_fast", "sigma_nu_slow", "sigma_eta"]

tsm = TwoState1SubjectModel()
with pm.Model() as mod:
x0 = pm.Data(“x0”, np.zeros(3, dtype=“float”))
P0 = pm.Data(“P0”, np.eye(3) * 10.0)
Af = pm.Beta(“Af”, alpha=2, beta=2)
As = pm.Beta(“As”, alpha=2, beta=2)
Bf = pm.Beta(“Bf”, alpha=2, beta=2)
Bs = pm.Beta(“Bs”, alpha=2, beta=2)
sigma_nu_fast = pm.Exponential(“sigma_nu_fast”, 1)
sigma_nu_slow = pm.Exponential(“sigma_nu_slow”, 1)
sigma_eta = pm.Exponential(“sigma_eta”, 1)
# PertSize = pm.Data(“PertSize”, np.ones(402))

tsm.build_statespace_graph(data = Y)
# idata = pm.sample()

You’re on the right track, you just need to first create a matrix of zeros for the intercept term, then allocate the values you what where you want them. For example:

intercept = pt.zeros((None, 3))
intercept = intercept[:, 0].set(Bf)
intercept = intercept[:, 1].set(Bs)
self.ssm['state_intercept', :, :] intercept

Whenever you have something time varying, the time shape is going to be None, which is pytensor’s way of saying “unknown until compile time”

This conversation might be helpful

You can’t create pt.zeros with None

It can be a pt.tensor in that case, just need to also allocate whatever to the 3rd column

Thank you for the reply! Very helpful!

After I added a dimension to the left for timesteps, it worked.

But it comes up with another question that I didn’t find any discussion about :
I defined state names for fast and slow processes in my two-state model, but I cannot see any related output. How does this property work? Is it possible to get posterior samples for these latent variables directly?

@property
def state_names(self):
return [“Xf”, “Xs”, “Yfs”]


The code is attached below for anyone who might have similar questions:

from pymc_extras.statespace.models.utilities import make_default_coords

class TwoState1SubjectModel(PyMCStateSpace):
def init(self, n_timesteps=T, psize=PertSize):
k_states = 3 # size of the state vector x
k_posdef = 3 # number of shocks (size of the state covariance matrix Q)
k_endog = 1 # number of observed states

    self.n_timesteps = n_timesteps
    self.psize = psize
    
    super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

def make_symbolic_graph(self):
    # Declare symbolic variables that represent parameters of the model
    x0 = self.make_and_register_variable("x0", shape=(3,))
    P0 = self.make_and_register_variable("P0", shape=(3, 3))
    Af = self.make_and_register_variable("Af", shape=()) #Afast, Aslow
    As = self.make_and_register_variable("As", shape=())
    Bf = self.make_and_register_variable("Bf", shape=()) #Bfast, Bslow
    Bs = self.make_and_register_variable("Bs", shape=())
    sigma_nu_fast = self.make_and_register_variable("sigma_nu_fast", shape=()) #sigma_fast
    sigma_nu_slow = self.make_and_register_variable("sigma_nu_slow", shape=()) #sigma_slow
    sigma_eta = self.make_and_register_variable("sigma_eta", shape=()) #sigma_y
    
    # Next, use these symbolic variables to build the statespace matrices by assigning each parameter
    # to its correct location in the correct matrix
    self.ssm["transition", :, :] = pt.stack([
        pt.stack([Af + Bf, 0.0, 0.0]),
        pt.stack([0.0, As + Bs, 0.0]),
        pt.stack([1.0, 1.0, 0.0])
    ])
    self.ssm["selection", :, :] = pt.stack([
    pt.stack([1.0, 0.0, Bf]),
    pt.stack([1.0, 0.0, Bs]),
    pt.stack([0.0, 0.0, 1.0])
    ])
    
    state_intercept = pt.zeros((self.n_timesteps, self.k_states))
    state_intercept = state_intercept[:, 0].set(Bf*self.psize)
    state_intercept = state_intercept[:, 1].set(Bs*self.psize)
    state_intercept = state_intercept[:, 2].set(self.psize)
    self.ssm["state_intercept"] = state_intercept

    self.ssm["design", :, :] = np.array([[0, 0, 1]])
    self.ssm["initial_state", :] = x0
    self.ssm["initial_state_cov", :, :] = P0
    self.ssm["state_cov", 0, 0] = sigma_nu_fast
    self.ssm["state_cov", 1, 1] = sigma_nu_slow
    self.ssm["state_cov", 2, 2] = sigma_eta
@property
def param_names(self):
        return ["x0", "P0", "Af", "As", "Bf", "Bs", "sigma_nu_fast", "sigma_nu_slow", "sigma_eta"]
 
@property
def state_names(self):
    return ["Xf", "Xs", "Yfs"]

@property
def observed_states(self):
    return ["handerror"]

tsm = TwoState1SubjectModel(n_timesteps=T, psize=PertSize)
with pm.Model() as mod:
x0 = pm.Data(“x0”, np.zeros(3, dtype=“float”))
P0 = pm.Data(“P0”, np.eye(3) * 5.0)
Af = pm.Beta(“Af”, alpha=4, beta=2)
As = pm.Beta(“As”, alpha=8, beta=2)
Bf = pm.Beta(“Bf”, alpha=2, beta=8)
Bs = pm.Beta(“Bs”, alpha=2, beta=8)
sigma_nu_fast = pm.HalfNormal(“sigma_nu_fast”, sigma=2.0)
sigma_nu_slow = pm.HalfNormal(“sigma_nu_slow”, sigma=2.0)
sigma_eta = pm.HalfNormal(“sigma_eta”, sigma=5.0)

tsm.build_statespace_graph(data = Y_filled)

After you have posterior idata, you can use tsm.sample_conditional_posterior and tsm.sample_unconditional_posterior to sample the hidden states. Other post-estimation helpers include tsm.forecast and tsm.impulse_response_function. See here or here for worked examples.

We should write a tutorial notebook just about post-estimation tasks.