Hey @ricardoV94, thanks for your fast reply! Yes, you are right, separating both states and performing two scans would work out with this model.
However, for many state space models, the states are depending on each other, which makes such a separation in general very difficult. My previous model is maybe a bit too simple to illustrate this, so I tried to quickly assemble another model with interdependent random and deterministic states:
def step(state, d, k, sigma, dt):
v, x = state
v_next = -pm.Normal.dist(mu=d * dt * v * pt.abs(v), sigma=sigma) - k * dt * pt.sin(x) + v
x_next = v * dt + x # This is causing a ValueError, as described above
# x_next = v * dt + pm.Normal.dist(mu=x, sigma=0.001) # Again, this would work - but it's not physical
state_next = pt.stack([v_next, x_next])
return state_next, pm.pytensorf.collect_default_updates(outputs=[state_next], inputs=[state, d, k, sigma, dt])
def statespace(v0, x0, d, k, sigma, dt, n_steps, *args, **kwargs):
state0 = pt.stack([v0, x0])
state, updates = pytensor.scan(
fn=step,
sequences=[],
outputs_info=state0,
non_sequences=[d, k, sigma, dt],
n_steps=n_steps,
name='statespace',
strict=True,
)
return state
with pm.Model() as model:
time = pd.RangeIndex(0, 200)
dt = time.step
n_steps = pm.Data('n_steps', time.size).astype('int') # For whatever reason, I'm getting an AssertionError during posterior sampling if this is not casted into pm.Data
model.add_coord('idx_time', time)
model.add_coord('states', ['v', 'x'])
k = pm.Gamma('k', mu=0.02, sigma=0.01)
d = pm.Gamma('d', mu=0.4, sigma=0.2)
sigma = pm.Gamma('sigma', mu=0.02, sigma=0.01)
x0 = pm.Normal('x0', mu=0, sigma=0.5)
v0 = pm.Normal('v0', mu=0, sigma=0.5)
eps = 0.01
state = pm.CustomDist('state', v0, x0, d, k, sigma, dt, n_steps, dist=statespace, dims=('idx_time', 'states'))
v = state[:, 0]
x = state[:, 1]
x_obs = pm.Normal('x_obs', mu=x, sigma=eps, dims=('idx_time'))
# Generate fake data
with pm.do(model, {k: 0.02, d: 0.4, sigma: 0.02, x0: 2.4, v0: 0}) as model_generative:
idata_generative = pm.sample_prior_predictive()
data_x = idata_generative.prior.x_obs.sel(chain=0, draw=42).values
# Sampling
with pm.observe(model, {x_obs: data_x}) as model_observed:
idata = pm.sample_prior_predictive()
idata.extend(pm.sample(nuts_sampler='numpyro'))
idata = pm.sample_posterior_predictive(idata, extend_inferencedata=True, predictions=False)
This model is more or less representing a pendulum damped by turbulent drag. According to kinematics, position x
and velocity v
have a deterministic relationship, but the incremental drag force is random. As you can see, v_next
and x_next
are depending on the pervious values of both states - So, a simple separation into two scans would not be possible here…
I had also in mind to separate the states after applying scan - but so far without any success:
state = statespace(v0, x0, d, k, sigma, dt, n_steps)
v = state[:, 0]
x = state[:, 1]
model.register_rv(v, 'v', dims=('idx_time',))
x = pm.Deterministic('x', x, dims=('idx_time',))
...