Stochastic Volatility with PyMCStateSpace

Hi!

First off, thank you for all of the work on PyMC! I am trying to implement a state space model from this paper (see appendix A for state space specification) using PyMCStateSpace. I currently have implemented it with time invariant obs_cov (see partial code below).

Is it possible to extend the model such that obs_cov varies over time (i.e. is a random walk process)?

from pymc_extras.statespace.utils.constants import (
    ALL_STATE_DIM,
    ALL_STATE_AUX_DIM,
    OBS_STATE_DIM,
    SHOCK_DIM,
    TIME_DIM,
)
from pymc_extras.statespace.core.statespace import PyMCStateSpace
from pymc_extras.statespace.models.utilities import make_default_coords

class VlekkeKoopmanMellens2021(PyMCStateSpace):
    def __init__(self):
        k_states = 5  # size of the state vector x
        k_posdef = 5  # number of shocks (size of the state covariance matrix Q)
        k_endog = 1  # number of observed states

        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

    def make_symbolic_graph(self):
        exog_data = self.make_and_register_data("exog_data", shape=(None, self.k_states))
        x0 = self.make_and_register_variable('x0', shape=(self.k_states,))
        P0 = self.make_and_register_variable('P0', shape=(self.k_states, self.k_states))
        sigma_eps = self.make_and_register_variable('sigma_eps', shape=())
        sigma_eta = self.make_and_register_variable('sigma_eta', shape=(self.k_posdef,))

        self.ssm['transition', :, :] = np.eye(self.k_states)
        assert self.k_states == self.k_posdef
        self.ssm['selection', :, :] = np.eye(self.k_states, self.k_posdef)

        self.ssm['initial_state', :] = x0
        self.ssm['initial_state_cov', :, :] = P0
        self.ssm["design"] = pt.expand_dims(exog_data, 1)  
        self.ssm['obs_cov', *np.diag_indices(self.k_endog)] = sigma_eps
        self.ssm['state_cov', *np.diag_indices(self.k_posdef)] = sigma_eta

    @property
    def param_names(self):
        return ['x0', 'P0', 'sigma_eps', 'sigma_eta']

    @property
    def state_names(self):
        return ['beta', 'theta', 'phi', 'gamma_1', 'gamma_2']

    @property
    def shock_names(self):
        return [f'{state}_shock' for state in self.state_names]

    @property
    def observed_states(self):
        return ['pi']

    @property
    def param_dims(self):
        return {
            'x0': (ALL_STATE_DIM,),
            'P0': (ALL_STATE_DIM, ALL_STATE_AUX_DIM),
            'sigma_eps': (OBS_STATE_DIM,),
            'sigma_eta': (SHOCK_DIM,),
        }

    @property
    def coords(self):
        coords = make_default_coords(self)
        coords.update({'exog_data_dim': ['u_gap', 'pi_p', 'pi_h', 'rel_i_p', 'oil_p']})

        return coords

    @property
    def param_info(self):
        info = {
            'x0': {
                'shape': (self.k_states,),
                'constraints': 'None',
            },
            'P0': {
                'shape': (self.k_states, self.k_states),
                'constraints': 'Positive Semi-definite',
            },
            'sigma_eps': {
                'shape': (self.k_endog,),
                'constraints': 'Positive',
            },
            'sigma_eta': {
                'shape': (self.k_posdef,),
                'constraints': 'Positive',
            },
        }

        for name in self.param_names:
            info[name]['dims'] = self.param_dims[name]

        return info

    @property
    def data_names(self):
        return ['exog_data']
    
    @property
    def data_info(self):
        return {
            "exog_data": {
                "shape": (None, self.k_states),
                "dims": (TIME_DIM, "exog_data_dim")
            }
        }

Yes you can do this. All statespace matrices except x0 and P0 are allowed to have a time dimension on the far left. There is some discussion about how this works here.

To take advantage of this, you will need to register sigma_eta with a time dimension. You don’t know how long that is going to be until you see the data, so put None like this:

        sigma_eta = self.make_and_register_variable('sigma_eta', shape=(None, self.k_posdef,))

To allocate this into state_cov, you will need to make it into a stack of covariance matrices. You can do this with pt.diag, but you need to vectorize it:

        self.ssm['state_cov'] = pt.vectorize(pt.diag, '(a)->(a,b)')(sigma_eta)

Finally, add the dimension TIME_DIM to sigma_eta. If you’re feeling fancy you can also update the info to note that it needs to be time-varying.

Two final comments:

  1. The statespace model itself doesn’t know anything about the time dynamics, so you have to give sigma_eta a prior that encode that information (via pm.GaussianRandomWalk or whatever)
  2. Small quibble with names: your parameter is called sigma_eta, but it’s being put directly into the covariance matrix, so it’s actually sigma_eta_squared. This is important if you are doing a replication study/communicating your results.
1 Like
  1. Small quibble with names: your parameter is called sigma_eta, but it’s being put directly into the covariance matrix, so it’s actually sigma_eta_squared. This is important if you are doing a replication study/communicating your results.

Great point, I fixed it below.

Thanks to your help, I was able to get it to sample with the time-varying observation covariance (i.e. obs_cov not state_cov). However, now I get an error while running sample_conditional_posterior. I assume that I have made a mistake when specifying the model, but I am not sure what the error message tells me.

Any ideas on what I might be doing wrong?

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[300], line 1
----> 1 cond_post = vkm.sample_conditional_posterior(idata)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1345, in PyMCStateSpace.sample_conditional_posterior(self, idata, random_seed, **kwargs)
   1318 def sample_conditional_posterior(
   1319     self, idata: InferenceData, random_seed: RandomState | None = None, **kwargs
   1320 ):
   1321     """
   1322     Sample from the conditional posterior; that is, given parameter draws from the posterior distribution,
   1323     compute Kalman filtered trajectories. Trajectories are drawn from a single multivariate normal with mean and
   (...)
   1342          "predicted_posterior", and "smoothed_posterior".
   1343     """
-> 1345     return self._sample_conditional(idata, "posterior", random_seed, **kwargs)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1117, in PyMCStateSpace._sample_conditional(self, idata, group, random_seed, data, **kwargs)
   1101 group_idata = getattr(idata, group)
   1103 with pm.Model(coords=self._fit_coords) as forward_model:
   1104     (
   1105         [
   1106             x0,
   1107             P0,
   1108             c,
   1109             d,
   1110             T,
   1111             Z,
   1112             R,
   1113             H,
   1114             Q,
   1115         ],
   1116         grouped_outputs,
-> 1117     ) = self._kalman_filter_outputs_from_dummy_graph(data=data)
   1119     for name, (mu, cov) in zip(FILTER_OUTPUT_TYPES, grouped_outputs):
   1120         dummy_ll = pt.zeros_like(mu)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1001, in PyMCStateSpace._kalman_filter_outputs_from_dummy_graph(self, data, data_dims, scenario)
    998     scenario = dict()
   1000 pm_mod = modelcontext(None)
-> 1001 self._build_dummy_graph()
   1002 self._insert_random_variables()
   1004 for name in self.data_names:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:965, in PyMCStateSpace._build_dummy_graph(self)
    954 """
    955 Build a dummy computation graph for the state space model matrices.
    956 
   (...)
    962     A list of pm.Flat variables representing all parameters estimated by the model.
    963 """
    964 for name in self.param_names:
--> 965     pm.Flat(
    966         name,
    967         shape=self._name_to_variable[name].type.shape,
    968         dims=self._fit_dims.get(name, None),
    969     )

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc/distributions/distribution.py:511, in Distribution.__new__(cls, name, rng, dims, initval, observed, total_size, transform, default_transform, *args, **kwargs)
    508     elif observed is not None:
    509         kwargs["shape"] = tuple(observed.shape)
--> 511 rv_out = cls.dist(*args, **kwargs)
    513 rv_out = model.register_rv(
    514     rv_out,
    515     name,
   (...)
    521     initval=initval,
    522 )
    524 # add in pretty-printing support

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc/distributions/continuous.py:369, in Flat.dist(cls, **kwargs)
    367 @classmethod
    368 def dist(cls, **kwargs):
--> 369     res = super().dist([], **kwargs)
    370     return res

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc/distributions/distribution.py:580, in Distribution.dist(cls, dist_params, shape, **kwargs)
    577     ndim_supp = cls.rv_op(*dist_params, **kwargs).owner.op.ndim_supp
    579 create_size = find_size(shape=shape, size=size, ndim_supp=ndim_supp)
--> 580 rv_out = cls.rv_op(*dist_params, size=create_size, **kwargs)
    582 _add_future_warning_tag(rv_out)
    583 return rv_out

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/random/op.py:315, in RandomVariable.__call__(self, size, name, rng, dtype, *args, **kwargs)
    313     props["dtype"] = dtype
    314     new_op = type(self)(**props)
--> 315     return new_op.__call__(
    316         *args, size=size, name=name, rng=rng, dtype=dtype, **kwargs
    317     )
    319 res = super().__call__(rng, size, *args, **kwargs)
    321 if name is not None:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/random/op.py:319, in RandomVariable.__call__(self, size, name, rng, dtype, *args, **kwargs)
    314     new_op = type(self)(**props)
    315     return new_op.__call__(
    316         *args, size=size, name=name, rng=rng, dtype=dtype, **kwargs
    317     )
--> 319 res = super().__call__(rng, size, *args, **kwargs)
    321 if name is not None:
    322     res.name = name

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/graph/op.py:293, in Op.__call__(self, name, return_list, *inputs, **kwargs)
    249 def __call__(
    250     self, *inputs: Any, name=None, return_list=False, **kwargs
    251 ) -> Variable | list[Variable]:
    252     r"""Construct an `Apply` node using :meth:`Op.make_node` and return its outputs.
    253 
    254     This method is just a wrapper around :meth:`Op.make_node`.
   (...)
    291 
    292     """
--> 293     node = self.make_node(*inputs, **kwargs)
    294     if name is not None:
    295         if len(node.outputs) == 1:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/random/op.py:349, in RandomVariable.make_node(self, rng, size, *dist_params)
    326 def make_node(self, rng, size, *dist_params):
    327     """Create a random variable node.
    328 
    329     Parameters
   (...)
    347 
    348     """
--> 349     size = normalize_size_param(size)
    351     dist_params = tuple(
    352         as_tensor_variable(p) if not isinstance(p, Variable) else p
    353         for p in dist_params
    354     )
    356     if rng is None:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/random/utils.py:190, in normalize_size_param(shape)
    186     raise TypeError(
    187         "Parameter size must be None, an integer, or a sequence with integers."
    188     )
    189 else:
--> 190     shape = cast(as_tensor_variable(shape, ndim=1, dtype="int64"), "int64")
    192     if not isinstance(shape, Constant):
    193         # This should help ensure that the length of non-constant `size`s
    194         # will be available after certain types of cloning (e.g. the kind
    195         # `Scan` performs)
    196         shape = specify_shape(shape, (get_vector_length(shape),))

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/__init__.py:50, in as_tensor_variable(x, name, ndim, **kwargs)
     18 def as_tensor_variable(
     19     x: TensorLike, name: str | None = None, ndim: int | None = None, **kwargs
     20 ) -> "TensorVariable":
     21     """Convert `x` into an equivalent `TensorVariable`.
     22 
     23     This function can be used to turn ndarrays, numbers, `ScalarType` instances,
   (...)
     48 
     49     """
---> 50     return _as_tensor_variable(x, name, ndim, **kwargs)

File ~/miniconda3/envs/py312/lib/python3.12/functools.py:907, in singledispatch.<locals>.wrapper(*args, **kw)
    903 if not args:
    904     raise TypeError(f'{funcname} requires at least '
    905                     '1 positional argument')
--> 907 return dispatch(args[0].__class__)(*args, **kw)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/basic.py:177, in _as_tensor_Sequence(x, name, ndim, dtype, **kwargs)
    172     # In this case, we have at least one non-`Constant` term, so we
    173     # couldn't get an underlying non-symbolic sequence of objects and we to
    174     # symbolically join terms.
    175     return stack(x)
--> 177 return constant(x, name=name, ndim=ndim, dtype=dtype)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/tensor/basic.py:223, in constant(x, name, ndim, dtype)
    220     else:
    221         x = x.data
--> 223 x_ = ps.convert(x, dtype=dtype)
    225 if ndim is not None:
    226     if x_.ndim < ndim:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/scalar/basic.py:249, in convert(x, dtype)
    247     if dtype == "floatX":
    248         dtype = config.floatX
--> 249     x_ = np.asarray(x).astype(dtype)
    250 else:
    251     # In this case, this function should infer the dtype according to the
    252     # autocasting rules. See autocasting above.
    253     x_ = None

TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

For reference, I made the changes and ended up with the following model definition:

from pymc_extras.statespace.utils.constants import (
    ALL_STATE_DIM,
    ALL_STATE_AUX_DIM,
    OBS_STATE_DIM,
    SHOCK_DIM,
    TIME_DIM,
)
from pymc_extras.statespace.core.statespace import PyMCStateSpace
from pymc_extras.statespace.models.utilities import make_default_coords

class VlekkeKoopmanMellens2021(PyMCStateSpace):
    def __init__(self):
        k_states = 5 # size of the state vector x
        k_posdef = 5 # number of shocks (size of the state covariance matrix Q)
        k_endog = 1  # number of observed states

        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

    def make_symbolic_graph(self):
        exog_data = self.make_and_register_data('exog_data', shape=(None, self.k_states))
        x0 = self.make_and_register_variable('x0', shape=(self.k_states,))
        P0 = self.make_and_register_variable('P0', shape=(self.k_states, self.k_states))
        h = self.make_and_register_variable('h', shape=(None, self.k_endog))
        sigma_sq_eta = self.make_and_register_variable('sigma_sq_eta', shape=(self.k_posdef,))

        self.ssm['transition', :, :] = np.eye(self.k_states)
        assert self.k_states == self.k_posdef
        self.ssm['selection', :, :] = np.eye(self.k_states, self.k_posdef)

        self.ssm['initial_state', :] = x0
        self.ssm['initial_state_cov', :, :] = P0
        self.ssm['design'] = pt.expand_dims(exog_data, 1)  
        self.ssm['obs_cov'] = pt.vectorize(pt.diag, '(a)->(a,b)')(pt.exp(h))
        self.ssm['state_cov', *np.diag_indices(self.k_posdef)] = sigma_sq_eta

    @property
    def param_names(self):
        return ['x0', 'P0', 'h', 'sigma_sq_eta']

    @property
    def state_names(self):
        return ['beta', 'theta', 'phi', 'gamma_1', 'gamma_2']

    @property
    def shock_names(self):
        return [f'{state}_shock' for state in self.state_names]

    @property
    def observed_states(self):
        return ['pi']

    @property
    def param_dims(self):
        return {
            'x0': (ALL_STATE_DIM,),
            'P0': (ALL_STATE_DIM, ALL_STATE_AUX_DIM),
            'h': (TIME_DIM, OBS_STATE_DIM),
            'sigma_sq_eta': (SHOCK_DIM,),
        }

    @property
    def coords(self):
        coords = make_default_coords(self)
        coords.update({'exog_data_dim': ['u_gap', 'pi_p', 'pi_h', 'rel_i_p', 'oil_p']})

        return coords

    @property
    def param_info(self):
        info = {
            'x0': {
                'shape': (self.k_states,),
                'constraints': 'None',
            },
            'P0': {
                'shape': (self.k_states, self.k_states),
                'constraints': 'Positive Semi-definite',
            },
            'h': {
                'shape': (None, self.k_endog),
                'constraints': 'Time-varying',
            },
            'sigma_sq_eta': {
                'shape': (self.k_posdef,),
                'constraints': 'Positive',
            },
        }

        for name in self.param_names:
            info[name]['dims'] = self.param_dims[name]

        return info

    @property
    def data_names(self):
        return ['exog_data']
    
    @property
    def data_info(self):
        return {
            'exog_data': {
                'shape': (None, self.k_states),
                'dims': (TIME_DIM, 'exog_data_dim')
            }
        }

The model sampling block looks as follows:

# Note: model_df is omitted for brevity
exog_data_df = model_df[['u_gap', 'e_p_cpi_1_meb', 'e_h_cpi_1_meb', 'rel_i_p', 'oil_p']]
obs_df = model_df[['cpi_meb']]

vkm = VlekkeKoopmanMellens2021()

b_40 = np.zeros(5, dtype="float")
P_40 = np.eye(5) * 0.1
h0_mu, h0_sigma = 0.0, 0.25

with pm.Model(coords=vkm.coords) as mod:
    x0 = pm.Deterministic('x0', pt.as_tensor(b_40), dims=[ALL_STATE_DIM])
    P0 = pm.Deterministic('P0', pt.as_tensor(P_40), dims=[ALL_STATE_DIM, ALL_STATE_AUX_DIM])
    h0 = pm.Normal.dist(mu=h0_mu, sigma=h0_sigma)

    # For sigma_sq_nu and sigma_sq_eta priors, see table 2 in appendix C in the refrence paper
    sigma_sq_nu = pm.InverseGamma('sigma_sq_nu', alpha=(vkm.k_states + 2) / 2, beta=0.454 / 2)
    sigma_sq_eta = pm.InverseGamma('sigma_sq_eta', alpha=(vkm.k_states + 2) / 2, beta=0.268 / 2, dims=[SHOCK_DIM])

    h = pm.GaussianRandomWalk('h', sigma=pt.sqrt(sigma_sq_nu), init_dist=h0, shape=obs_df.shape, dims=[TIME_DIM, OBS_STATE_DIM])
    sigma_sq_eps = pm.Deterministic('sigma_sq_eps', pt.exp(h), dims=[TIME_DIM, OBS_STATE_DIM])
    
    exog_data = pm.Data('exog_data', exog_data_df, dims=[TIME_DIM, 'exog_data_dim'])

    vkm.build_statespace_graph(data=obs_df, mode='JAX')
    idata = pm.sample(nuts_sampler='numpyro', chains=4, draws=1000)

This is a bug, the problem is that the model needs to record the shape of h after you’re done fitting. You can reproduce the error like this:

with pm.Model() as m:
    pm.Flat('x', shape=(None, 10), dims=None)

If you could open an issue on pymc-extras I’d really appreciate it. You can basically just copy/paste your post.

In the meantime, I think you should be able to do a hack work-around by specifying the shape of h in the _name_to_variable dictionary, as in:

h = vkm._name_to_variable['h'] 
h = pt.specify_shape(h, (obs_df.shape[0], vkm.k_endog))
h.name = 'h'
vkm._name_to_variable['h']  = h

pt.specify_shape returns a copy of a variable with fixed shapes. One negative though is that it doesn’t propagate the name of the variable, that’s why I set it again (it probably doesn’t matter but I like to do it).

Simple usage:

import pytensor.tensor as pt
x = pt.tensor('x', shape=(None, 3))
print(x.type.shape) # (None, 3)
x = pt.specify_shape(x, (10, 3))
print(x.type.shape) # (10, 3)
1 Like

I will create an issue in pymc-extras (edit: it is issue #412).

I assume that I have to apply the workaround after sampling, before calling sample_conditional_posterior? In this case I get a different error:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[88], line 6
      3 h.name = 'h'
      4 vkm._name_to_variable['h']  = h
----> 6 cond_post = vkm.sample_conditional_posterior(idata)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1345, in PyMCStateSpace.sample_conditional_posterior(self, idata, random_seed, **kwargs)
   1318 def sample_conditional_posterior(
   1319     self, idata: InferenceData, random_seed: RandomState | None = None, **kwargs
   1320 ):
   1321     """
   1322     Sample from the conditional posterior; that is, given parameter draws from the posterior distribution,
   1323     compute Kalman filtered trajectories. Trajectories are drawn from a single multivariate normal with mean and
   (...)
   1342          "predicted_posterior", and "smoothed_posterior".
   1343     """
-> 1345     return self._sample_conditional(idata, "posterior", random_seed, **kwargs)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1117, in PyMCStateSpace._sample_conditional(self, idata, group, random_seed, data, **kwargs)
   1101 group_idata = getattr(idata, group)
   1103 with pm.Model(coords=self._fit_coords) as forward_model:
   1104     (
   1105         [
   1106             x0,
   1107             P0,
   1108             c,
   1109             d,
   1110             T,
   1111             Z,
   1112             R,
   1113             H,
   1114             Q,
   1115         ],
   1116         grouped_outputs,
-> 1117     ) = self._kalman_filter_outputs_from_dummy_graph(data=data)
   1119     for name, (mu, cov) in zip(FILTER_OUTPUT_TYPES, grouped_outputs):
   1120         dummy_ll = pt.zeros_like(mu)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:1002, in PyMCStateSpace._kalman_filter_outputs_from_dummy_graph(self, data, data_dims, scenario)
   1000 pm_mod = modelcontext(None)
   1001 self._build_dummy_graph()
-> 1002 self._insert_random_variables()
   1004 for name in self.data_names:
   1005     if name not in pm_mod:

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pymc_extras/statespace/core/statespace.py:708, in PyMCStateSpace._insert_random_variables(self)
    705 matrices = list(self._unpack_statespace_with_placeholders())
    707 replacement_dict = {var: pymc_model[name] for name, var in self._name_to_variable.items()}
--> 708 self.subbed_ssm = graph_replace(matrices, replace=replacement_dict, strict=True)

File ~/miniconda3/envs/py312/lib/python3.12/site-packages/pytensor/graph/replace.py:180, in graph_replace(outputs, replace, strict)
    178     non_fg_replace = {r: v for r, v in replace_dict.items() if r not in equiv}
    179     if non_fg_replace:
--> 180         raise ValueError(f"Some replacements were not used: {non_fg_replace}")
    181 toposort = fg.toposort()
    183 def toposort_key(
    184     fg: FunctionGraph, ts: list[Apply], pair: tuple[Variable, Variable]
    185 ) -> int:

ValueError: Some replacements were not used: {h: h}

Dang, nothing is ever easy. I’ll look more closely tonight and get back to you.

1 Like

Just wanted to ask if you have any other ideas for a workaround. If not or if it is non-trivial then it is fine as well, in that case I will make due without stochastic volatility.

I opened a PR that should fix it. You can pip install my development branch directly if you really can’t wait, otherwise it should be merged in the next few days.

Awesome. Thanks!