Partial Observable Latent Variables

I’m trying to model a parameter as time varying but where I have strong priors on what the value should be at various points in time. This would be similar to the approach that was taken in Rolling Regression — PyMC example gallery for estimating alpha but where there is prior information about what alpha should be at certain points (denoted with blue X in the graph below).

I’m wondering what the various approaches for doing this are, my initial searches haven’t turned up much.

Curious if others have alternative approaches, but one option I can think of is to manually specify a random walk as a cumulation of innovations interleaved with a prior on a specific point. Concretely this would look something like

import pymc as pm
import aesara.tensor as at
import numpy as np
import matplotlib.pyplot as plt

rng = np.random.default_rng(42)
data = np.concatenate([rng.normal(size=200).cumsum(), np.concatenate([[5], rng.normal(size=180)]).cumsum()])

_, ax = plt.subplots(1, 1)
ax.plot(data)
ax.set_title("Raw Data")


with pm.Model() as test_model:
    # std of random walk
    sigma_alpha = pm.Exponential("sigma_alpha", 50.0)

    innovations1 = pm.Normal("innovations1", mu=0, sigma=sigma_alpha, shape=200)
    midpoint = pm.Normal("midpoint", mu=0, sigma=1, shape=1)
    innovations2 = pm.Normal("innovations2", mu=0, sigma=sigma_alpha, shape=180)

    alpha = pm.Deterministic(
        "alpha",
        at.concatenate(
            [
                at.cumsum(innovations1),
                at.cumsum(at.concatenate([midpoint, innovations2])),
            ]
        ),
    )
    y = pm.Normal("y", mu=alpha, sigma=1, observed=data)
    idata = pm.sample_prior_predictive()
    idata.extend(pm.sample())

idata.prior.alpha.to_series().unstack(level=["chain", "draw"]).plot(title="alpha prior", legend=False)
idata.posterior.alpha.to_series().unstack(level=["chain", "draw"]).plot(title="alpha posterior", legend=False)

image


image

One (hacky) strategy would be use your initial model, but then slap a pm.Potential (or 2) in there to penalize deviations from the known(ish) y values at the associated x values. Maybe @jessegrabowski would have a more principled suggestion.

The problem with just inserting the known values into the GRW is that you get that “jumping” you see in the prior (and the posterior). If that large innovation at t=200 is acceptable to you, rock on. That’s definitely going to be the easiest way.

My first thought was a Potential as well. It would just need to be a quadratic loss, like pm.Potential('knowledge_penalty', -a * (innovations[200] - midpoint) ** 2), where a is a strength parameter you can play around with.

A more principle approach would require some more structure than just a GRW. My first thought was a GP prior, conditioned on the values you have prior information about. Something like this:

time_idx = np.arange(100)[:, None]

with pm.Model() as m:
    sigma_alpha = pm.Exponential("sigma_alpha", 1)
    eta_alpha = pm.Exponential('ell_alpha', 1)
    
    alpha_cov = eta_alpha**2 * pm.gp.cov.Exponential(1, ls=sigma_alpha)
    alpha_gp = pm.gp.Latent(cov_func=alpha_cov)
    alpha_gp.prior('alpha_uncond', X=time_idx)
    a50 = pm.Normal('a50', mu=1.2, sigma=0.1)
    alpha = alpha_gp.conditional('alpha', Xnew=time_idx, given={'X':np.array([[50]]), 'f':a50})

1000 draws from alpha:

Untitled

I didn’t try sampling a posterior from it, but it will definitely struggle if the length of the time series is long.

Another option would be some kind of parametric function. You could ensure that a polynomial of order k >n will pass through n specified points by fixing n of the coefficients. That would perform poorly on forecasting (all polynomial models do) but in-sample it might be reasonable.

Finally, you could solve an optimization problem. You have a transition equation: x_{t+1} = x_t + \epsilon, for T timesteps. Say you “know” x_{50} = \bar x, and you sample 100 i.i.d. epsilons (so they are also “known”). Now you have a square system of 99 unknows x_0, x_1, \dots x_{49}, x_{51}, \dots and 99 equations, which is a square system with a unique solution trajectory. You can run an optimizer to get the unique trajectory that satisfies x_{50} and the epsilons, and plug it into your model.

Thanks @jessegrabowski . You’re right that the jumping in my proposed solution is a non desirable feature for the problem I have. The GP looks promising, I will read up on that and try to play around with this approach.

@bwengals is the GP expert, I actually know very little about them. I’d be curious what he thinks of the approach, what I messed up about the implementation, and whether the HSGP approximation can be used in this case. I was wondering too if there’s any way to get a non-stationary GP (at least on some closed interval). One thing I don’t like about the prior I showed here is that it’s stationary, whereas the GRW is non-stationary. It seems like there should be a mapping between a GRW and a GP, but maybe not? I base this on the fact that there is a mapping between an AR(p) process and a GP [citation needed].

That’s a cool idea @jessegrabowski to use conditional there, I think that makes sense. Hopefully it’s not too slow, how much data are you dealing with? You can plug your alpha then into the likelihood. Whether a GP is stationary or not just depends on the covariance function you choose. There’s a covariance function thats equivalent to random walk (below). Its definitely more efficient though to not use the covariance function / GP parameterization. You could try using it with conditional, but I think it might actually be equivalent to your original solution @mgilbert?

Here’s an example:

import arviz as az
import pymc as pm
import pytensor.tensor as pt
import numpy as np
import matplotlib.pyplot as plt

class GRWCov(pm.gp.cov.Covariance):
    def __init__(self, input_dim=1, active_dims=None):
        super().__init__(input_dim=input_dim, active_dims=active_dims)

    def full(self, X, Xs=None):
        if Xs is None:
            Xs = X
        return pt.minimum(X, Xs.T)   

x = np.arange(100)
with pm.Model() as model:
    sigma = pm.HalfNormal("sigma", sigma=1) 
    cov_func = sigma**2 * GRWCov(input_dim=1)
    
    gp = pm.gp.Latent(cov_func=cov_func)
    f = gp.prior("f", X=x[:, None])
    
    prior = pm.sample_prior_predictive()

f = az.extract(prior, group="prior", var_names="f").data
plt.plot(x, f[:, [1,3,5]]);

Another option is maybe using a Bernstein polynomial basis? The coefficients for those are the y-values at the knot, so you could set a strong prior on those where you have the blue x’s.

1 Like

I don’t think it’s the same as inserting the priors into a random walk, because the conditional marginalization will give information about the “knots” to the adjacent positions. With the random walk kernel you end up with some really coherent trajectories. This is two knots at 25 and 75, note that there’s not huge discontinuous jumps like the original solution:

Untitled

But the point about efficiency is well taken.

I have about 5-10 years of monthly data. @jessegrabowski do you have an example of that last chart you posted? This is just the posterior from the original example you posted?

No, this is draws from the gp prior I first posted, but using the covariance function that @bwengals just posted, with given data alpha_knots=pm.Normal('alpha_knots', mu=[-1.2, 1.2], sigma=0.1) at t=25, 75 and sigma=pm.HalfNormal("sigma", sigma=0.1)