On second reading, it doesn’t seem you want an optimal policy, just to do online filtering? If that’s the case, the setup is much easier. Just compile the required pytensor functions and feed in data:
import pytensor
import pytensor.tensor as pt
from pymc_extras.statespace.filters import StandardFilter
import numpy as np
# Use the model from the textbook "dog tracking" example
dt = pt.dscalar('dt')
sigma_sq = pt.dscalar('sigma_sq')
measurement_sigma_sq = pt.dscalar('measurement_sigma_sq')
data = pt.tensor('data', shape=(1,))
# All the names are different from the textbook, we are consistent with Durbin and Koopsman
# In general, engineers and economists disagree on the names, but shuffle the same letters :)
x = pt.tensor('x', shape=(2,))
P = pt.tensor('P', shape=(2, 2))
c = pt.zeros((2,))
d = pt.zeros((1,))
# T = [[1.0, dt],
# [0.0, 1.0]]
T = pt.eye(2)[0, 1].set(dt)
Z = pt.zeros((1, 2))[0, 0].set(1.)
R = pt.eye(2)
H = pt.eye(1) * measurement_sigma_sq
# This equation comes from textbooks, look up "Van Loan discretization"
Q = sigma_sq * pt.stacklists([[dt ** 4 / 4, dt ** 2 / 2],
[dt ** 2 / 2, dt]])
# This is currently "off label" usage, so we have to hack a bit...
kf = StandardFilter()
kf.cov_jitter = 1e-8
kf.n_endog = 1
kf.n_states = 2
kf.n_shocks = 1
# Super important! In Durbin and Koopsman, x0 is a *prediction*, so one has to run the filter first
# In Labbe, x0 is a *filtered value*, so one predicts first.
# Unused return values are: y_hat, F, ll
x_filtered, P_filtered, *_ = kf.update(x, P, data, d, Z, H, pt.isnan(y).all())
x_predicted, P_predicted = kf.predict(x_filtered, P_filtered, c, T, R, Q)
f_filter = pytensor.function([x, P, data, dt, sigma_sq, measurement_sigma_sq],
[x_filtered, P_filtered],
on_unused_input='ignore')
f_predict = pytensor.function([x, P, data, dt, sigma_sq, measurement_sigma_sq],
[x_predicted, P_predicted],
on_unused_input='ignore')
Reproduce the “dog tracking” simulation:
import copy
import math
import numpy as np
from numpy.random import randn
def compute_dog_data(z_var, process_var, count=1, dt=1.):
"returns track, measurements 1D ndarrays"
x, vel = 0., 1.
z_std = math.sqrt(z_var)
p_std = math.sqrt(process_var)
xs, zs = [], []
for _ in range(count):
v = vel + (randn() * p_std)
x += v*dt
xs.append(x)
zs.append(x + randn() * z_std)
return np.array(xs), np.array(zs)
x_hat = np.array([0, 0])
P_hat = np.diag([500., 49.])
innovation_var = 0.01
measurement_var = 10.0
n_steps = 50
dt_val = 1.0
dt_values = np.full(n_steps, dt_val)
hidden_states, data = compute_dog_data(z_var=measurement_var,
process_var=innovation_var,
count=n_steps,
dt=dt_val)
state_history = np.zeros((n_steps, 4))
cov_history = np.zeros((n_steps, 8))
state_history[0, 2:] = x_hat
cov_history[0, 4:] = P_hat.ravel()
for t, (y, delta_t) in enumerate(zip(data, dt_values)):
x_tt, P_tt = f_filter(x=x_hat, P=P_hat, data=y[None], dt=delta_t,
sigma_sq=innovation_var,
measurement_sigma_sq=measurement_var)
x_hat, P_hat = f_predict(x=x_tt, P=P_tt, data=y[None], dt=delta_t,
sigma_sq=innovation_var,
measurement_sigma_sq=measurement_var)
state_history[t, :] = np.r_[x_tt.ravel(), x_hat.ravel()]
cov_history[t, :] = np.r_[P_tt.ravel(), P_hat.ravel()]
Plot results:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(14, 4))
ax.plot(dt_values.cumsum(), data)
mu_filtered = state_history[:, :2]
std_filtered = np.diagonal(cov_history[:, :4].reshape((-1, 2, 2)), axis1=1, axis2=2)
low = mu_filtered - 2 * std_filtered
high = mu_filtered + 2 * std_filtered
ax.plot(dt_values.cumsum(), mu_filtered[:, 0])
ax.fill_between(dt_values.cumsum(), low[:, 0], high[:, 0], color='tab:orange', alpha=0.25)
ax.plot(dt_values.cumsum(), hidden_states)
Result:
For the record, I’d still like it if someone came and worked on optimal linear-quadratic control problems. I think there are interesting applications in e.g. marketing 
Also I’d like help making some tools to simplify all this setup, issue here.