How do I set a time-varying observation intercept in a state-space model

Hello,
How do I set a time-varying observation intercept in a state-space model? According to the comments on this question, it’s supported. I want to do this instead of using the RegressionComponent because I have a fairly complex deterministic function that I want to subtract off from the endogenous variable. The example below is substantially simplified. Are there any examples of 2d time-varying parameters I can look at?

If I understand this correctly you should it should be possible to add it by specifying a two-dimensional observation intercept with the first-dimension being time. Originally, I was trying to modify with the structural class. I was able to get a time-varying observation covariance to work by adding a third dimension. To simplify the code, I modified the the example notebook AutoRegressiveThree(PyMCStateSpace) .

class AutoRegressiveThree(PyMCStateSpace):
    def __init__(self):
        k_states = 3  # size of the state vector x
        k_posdef = 1  # number of shocks (size of the state covariance matrix Q)
        k_endog = 1  # number of observed states
        self.time_dim = 100

        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

    def make_symbolic_graph(self):
        x0 = self.make_and_register_variable("x0", shape=(3,))
        P0 = self.make_and_register_variable("P0", shape=(3, 3))
        ar_params = self.make_and_register_variable("ar_params", shape=(3,))
        sigma_x = self.make_and_register_variable("sigma_x", shape=())
        beta = self.make_and_register_variable("beta", shape=())
        time_trend = self.make_and_register_variable('time_trend', shape=(self.time_dim,))

        self.ssm["transition", :, :] = np.eye(3, k=-1)
        self.ssm["selection", 0, 0] = 1
        self.ssm["design", 0, 0] = 1

        self.ssm["initial_state", :] = x0
        self.ssm["initial_state_cov", :, :] = P0
        self.ssm["transition", 0, :] = ar_params
        self.ssm["state_cov", :, :] = sigma_x
        self.ssm['obs_cov', :, :] = beta * time_trend

    @property
    def param_names(self):
        return ["x0", "P0", "ar_params", "sigma_x", 'beta', 'time_trend']

The model is here

ar3 = AutoRegressiveThree()
with pm.Model() as mod:
    x0 = pm.Deterministic("x0", pt.arange(3, dtype="float"))
    P0 = pm.Deterministic("P0", pt.eye(3) * 10.0)
    ar_params = pm.Deterministic("ar_params", pt.as_tensor_variable([10.0, 11.0, 12.0]))
    sigma_x = pm.Deterministic("sigma_x", pt.as_tensor_variable(13.0, dtype="float64"))
    beta = pm.Deterministic("beta", pt.as_tensor_variable(14.0, dtype='float64'))
    time_trend = pm.Deterministic("time_trend", pt.arange(100).astype('float64'))

    ar3._insert_random_variables()
``

Looking at the shape  `ar3.unpack_statespace()[-2].shape.eval()` we get `array([1, 1])` instead of `array([100, 1])` as expected.   Any help would be great appreciated.

@

Hello! @jessegrabowski! You appear to be the resident expert on state space models. Do you know how to make the observation intercept time-varying? Any thoughts you’d have would be much appreciated!

Thanks!
Paul

Hi Paul,

I just wanted to let you know I saw this and haven’t forgotten about it – I’ll write up something to try to help tomorrow

Thanks for the response! I think I might have figured out the issue affecting my original example, where I was using adapting the structural code. Some of the representation code that checks to see if the matrices are the right size doesn’t handle time-varying observation intercepts even though they’re actually valid.

Here’s a git diff of the change I made to representation.py.


diff --git a/pymc_extras/statespace/core/representation.py b/pymc_extras/statespace/core/representation.py
index 450108d..f563ca8 100644
--- a/pymc_extras/statespace/core/representation.py
+++ b/pymc_extras/statespace/core/representation.py
@@ -298,7 +298,7 @@ class PytensorRepresentation:
                     )
 
                 # Time varying vector case, check only the static shapes
-                if X.ndim == 2 and X.shape[1:] != expected_shape:
+                if X.ndim == 2 and shape[1:] != expected_shape:
                     raise ValueError(
                         f"Last dimension of array provided for {name} has shape {X.shape[1]}, "
                         f"expected {expected_shape}"
@@ -334,7 +334,7 @@ class PytensorRepresentation:
                 X = pt.expand_dims(X, (0,))
                 X = pt.specify_shape(X, self.shapes[name])
 
-            elif X.ndim == 2:
+            elif X.ndim == 2 and name not in VECTOR_VALUED:
                 X = pt.expand_dims(X, (0,))
                 X = pt.specify_shape(X, self.shapes[name])
 
sangrey@MAC-PSANGREY pymc_extras %

Once I make those changes, the code runs and appears to have the correct shapes. I could potentially submit a pull request if that would be helpful. Presumably, the simplified example above has another issue because it still isn’t working.

Hey @sangrey, Jesse will be able to give you a much better answer than what I have put together for you, but in the meantime this might point you in the right direction.

There are two key steps to setting a time-varying observation intercept:
1). You need to assign the first dimension as the time dimension
2). You need to make sure that you properly configure your model dims to treat the first dimension as the time dimension

Below is a modification of the AR3 model derived from the example notebook to include a time-varying observation intercept:

import numpy as np
from pymc_extras.statespace.core.statespace import PyMCStateSpace
import pytensor.tensor as pt
import pymc as pm

from pymc_extras.statespace.utils.constants import (
    ALL_STATE_DIM,
    ALL_STATE_AUX_DIM,
    SHOCK_DIM,
    TIME_DIM
)

class AutoRegressiveThree(PyMCStateSpace):
    def __init__(self):
        k_states = 3  
        k_posdef = 1 
        k_endog = 1 

        super().__init__(k_endog=k_endog, k_states=k_states, k_posdef=k_posdef)

    def make_symbolic_graph(self):
        x0 = self.make_and_register_variable("x0", shape=(3,))
        P0 = self.make_and_register_variable("P0", shape=(3, 3))
        ar_params = self.make_and_register_variable("ar_params", shape=(3,))
        sigma_x = self.make_and_register_variable("sigma_x", shape=())
        alpha_t = self.make_and_register_variable("alpha_t", shape=(None, 1)) # Time varying observation intercept first dimension is the TIME DIM

        self.ssm["transition", :, :] = np.eye(3, k=-1)
        self.ssm["selection", 0, 0] = 1
        self.ssm["design", 0, 0] = 1

        self.ssm["initial_state", :] = x0
        self.ssm["initial_state_cov", :, :] = P0
        self.ssm["transition", 0, :] = ar_params
        self.ssm["state_cov", :, :] = sigma_x
        self.ssm["obs_intercept"] = alpha_t

    @property
    def param_names(self):
        return ["x0", "P0", "ar_params", "sigma_x", "alpha_t"]
    
    @property
    def param_dims(self):
        return {
            "x0": (ALL_STATE_DIM,),
            "P0": (ALL_STATE_DIM, ALL_STATE_AUX_DIM),
            "ar_params": ("ar_lags",),
            "sigma_x": (SHOCK_DIM,),
            "alpha_t": (TIME_DIM,) # Need to also make sure you designate the first dimension as the TIME DIM
        }

n = 100 
phi = [0.6, -0.4, 0.2] 
sigma = 1.0 

noise = np.random.normal(loc=0, scale=sigma, size=n)
x = np.zeros(n)

# Generate AR(3) process
for t in range(3, n):
    x[t] = phi[0]*x[t-1] + phi[1]*x[t-2] + phi[2]*x[t-3] + noise[t]

data = x[:, np.newaxis]

ar3 = AutoRegressiveThree()
with pm.Model() as pymc_mod:
    x0 = pm.Deterministic(
        "x0",
        pt.zeros(
            3,
        ),
    )
    P0 = pm.Deterministic("P0", pt.eye(3) * 10)

    ar_params = pm.Normal("ar_params", sigma=0.25, shape=(3,))
    sigma_x = pm.Exponential("sigma_x", 1)
    alpha_t = pm.Deterministic("alpha_t", pt.expand_dims(pt.arange(100, dtype="float64"), 1))

    ar3.build_statespace_graph(data=data, mode="JAX")

with pymc_mod:
    idata = pm.sample(nuts_sampler="nutpie", nuts_sampler_kwargs={"backend": "jax", "gradient_backend": "jax"})

# Confirm the shape is (chains, draws, time-dimension, dimension_of_observations)
idata.posterior['alpha_t'].shape

I commented the relevant sections inside the PyMCStateSpace definition that correspond to the two items I mentioned above. I hope this helps you somewhat!

That’s interesting. The actual code that I am using was adapted from the structural class. It runs fine with the changes I placed above, but crashes without them. I’m calling make_and_register_variable with the name and shape and specifying the dim names. The other thing I noticed is if I replace self._time_dim below with None, I get errors saying that the shape has to be determined at build time.

class ObsIntercept(Component2):

    def __init__(self, endog_names, time_dim, name: str = "y"):

        self._time_dim = time_dim
        super().__init__(
            name=name,
            endog_names=endog_names,
            k_endog=len(endog_names),
            k_states=0,
            k_posdef=0,
            measurement_error=False,
            combine_hidden_states=False,
        )

    def populate_component_properties(self):

        self.param_info = {
            f"mu": {
                "shape": (
                    self._time_dim,
                    self.k_endog,
                ),
                "constraints": None,
                "dims": (
                    TIME_DIM,
                    pmx.statespace.utils.constants.OBS_STATE_DIM,
                ),
            },
        }
        self.param_dims = {name: val["dims"] for name, val in self.param_info.items()}
        self.param_names = list(self.param_info.keys())

    def make_symbolic_graph(self) -> None:

        mu = self.make_and_register_variable(
            f"mu",
            shape=(
                self._time_dim,
                self.k_endog,
            ),
        )

        self.ssm["obs_intercept"] = mu

Here are the classes I am inheriting from. They’re lightly modified from the structural time series classes to be able to handle multiple endogenous variables. I doubt those changes matter here.

class StructuralTimeSeries2(pmx.statespace.models.structural.StructuralTimeSeries):
    r"""
    Structural Time Series Model

    The structural time series model, named by [1] and presented in statespace form in [2], is a framework for
    decomposing a univariate time series into level, trend, seasonal, and cycle components. It also admits the
    possibility of exogenous regressors. Unlike the SARIMAX framework, the time series is not assumed to be stationary.

    Notes
    -----

    .. math::
         y_t = \mu_t + \gamma_t + c_t + \varepsilon_t

    """

    def __init__(self, endog_names, **kwargs):

        self._endog_names = endog_names

        super().__init__(**kwargs)

    @property
    def observed_states(self):
        return self._endog_names


def order_to_mask(order):
    if isinstance(order, int):
        return np.ones(order).astype(bool)
    else:
        return np.array(order).astype(bool)


class Component2(pmx.statespace.models.structural.Component):

    def __init__(self, endog_names, **kwargs):

        self.endog_names = endog_names

        super().__init__(**kwargs)

    def build(self, name=None, filter_type="standard", verbose=True):
        """
        Build a StructuralTimeSeries statespace model from the current component(s)

        Parameters
        ----------
        name: str, optional
            Name of the exogenous data being modeled. Default is "data"

        filter_type : str, optional
            The type of Kalman filter to use. Valid options are "standard", "univariate", "single", "cholesky", and
            "steady_state". For more information, see the docs for each filter. Default is "standard".

        verbose : bool, optional
            If True, displays information about the initialized model. Defaults to True.

        Returns
        -------
        PyMCStateSpace
            An initialized instance of a PyMCStateSpace, constructed using the system matrices contained in the
            components.
        """

        return StructuralTimeSeries2(
            endog_names=self.endog_names,
            ssm=self.ssm,
            name=name,
            state_names=self.state_names,
            data_names=self.data_names,
            shock_names=self.shock_names,
            param_names=self.param_names,
            param_dims=self.param_dims,
            coords=self.coords,
            param_info=self.param_info,
            data_info=self.data_info,
            component_info=self._component_info,
            measurement_error=self.measurement_error,
            exog_names=self.exog_names,
            name_to_variable=self._name_to_variable,
            name_to_data=self._name_to_data,
            filter_type=filter_type,
            verbose=verbose,
        )

    def __add__(self, other):

        if self.endog_names != other.endog_names:
            raise ValueError("The endog names must be the same.")

        state_names = self._combine_property(other, "state_names")
        data_names = self._combine_property(other, "data_names")
        param_names = self._combine_property(other, "param_names")
        shock_names = self._combine_property(other, "shock_names")
        param_info = self._combine_property(other, "param_info")
        data_info = self._combine_property(other, "data_info")
        param_dims = self._combine_property(other, "param_dims")
        coords = self._combine_property(other, "coords")
        exog_names = self._combine_property(other, "exog_names")

        _name_to_variable = self._combine_property(other, "_name_to_variable")
        _name_to_data = self._combine_property(other, "_name_to_data")

        measurement_error = any([self.measurement_error, other.measurement_error])

        k_states, k_posdef, k_endog = self._get_combined_shapes(other)

        ssm = self._combine_statespace_representations(other)

        new_comp = Component2(
            name="",
            endog_names=self.endog_names,
            k_endog=k_endog,
            k_states=k_states,
            k_posdef=k_posdef,
            measurement_error=measurement_error,
            representation=ssm,
            component_from_sum=True,
        )
        new_comp._component_info = self._combine_component_info(other)
        new_comp.name = new_comp._make_combined_name()

        names_and_props = [
            ("state_names", state_names),
            ("data_names", data_names),
            ("param_names", param_names),
            ("shock_names", shock_names),
            ("param_dims", param_dims),
            ("coords", coords),
            ("param_dims", param_dims),
            ("param_info", param_info),
            ("data_info", data_info),
            ("exog_names", exog_names),
            ("_name_to_variable", _name_to_variable),
            ("_name_to_data", _name_to_data),
        ]

        for prop, value in names_and_props:
            setattr(new_comp, prop, value)

        return new_comp

Yes, could you open a PR? I think these changes look good, but would like to see that the whole test suite still passes.

The first change (checking X.type.shape instead of X.shape) is 100% definitely a bugfix. X.shape is a symbolic tensor, so as written, it will always evaluate to False. I’m a bit upset no existing test caught that.

The second change also makes sense, but it’s less clear what that function is doing (who wrote this code anyway!?!?!). Basically it’s crying out for more documentation.

If you’d also be willing to open a PR with the Intercept component that would be great too, it seems generically quite useful (we can have an observation: bool argument to put it in either the measurement or transition equation).

Finally finally, it seems like you’re attacking this issue? I just made it, but it was on the statespace roadmap as highly important from the beginning. I was gearing up to work on that this week, but if you’ve already done stuff I’d like to see and adapt your code. Or, again, if you’re willing to open a PR and we can just iterate from there.

Here’s a clearer and more polished version of your message that preserves all the important content and intent:


Hi all,

I’ve opened a pull request to incorporate the changes to the Representation class here:
:link: https://github.com/pymc-devs/pymc-extras/pull/490

I also started merging some of the changes into structural.py. Previously, I had been inheriting from the structural component classes instead of modifying them directly. I’ve now tried to fold those changes back into the base classes, but I may have introduced some issues in the process.

Unfortunately, I haven’t been able to fully test this. We’ve been running things on Databricks internally, and I haven’t had much luck getting pymc-extras to build cleanly there due to hatch dependency resolution issues.


On a broader note, I was able to get a more customized model running internally (which wouldn’t make sense to merge as-is). However, it was extremely slow (About a week to train). I was estimating a model with:

  • ~1800 time periods,
  • k_endog = 4,
  • time-varying measurement error,
  • and a time-varying observation intercept.

The likelihood itself only takes ~0.2 seconds to evaluate, but with gradient calculations included, it looked like it would take about a week to run. When profiling, the entire cost seems to be coming from inside the scan. Am I correct in assuming that the gradient of the scan is the main bottleneck?

Interestingly, a functionally similar model using pymc.GaussianRandomWalk and pymc.AR finishes in about 3 hours — presumably because it avoids scan, at least in the same way.

Also, when I try to profile the gradient directly, I run into errors related to missing .data attributes. Is it expected that you can’t profile the gradient when scan is involved?

As a result, I’ve had to move away from using the state-space model, and put this work on the back burner. If the changes to structural.py aren’t useful, feel free to disregard. I just thought I’d share since you asked.