I think the best way is to allow for multiple observation, by duplicating state at the same timestep if it has multiple observation
In this example I am trying to model the following regression
y(t) = (a \cdot t + b) + (c(t) \cdot \text{input}(t))
Which I modeled as
Output
Note I’ve tried several method to model time-varying coefficients, but linear spline seems to work the best for multi-observations (other methods either diverge, or converge to incorrect value)
Gist PyMC multiple observations examples · GitHub
Setup Code
import pymc as pm
import arviz as az
import numpy as np
import pandas as pd
import pytensor.tensor as pt
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
p.random.seed(1331)
t_main = np.linspace(0, 1, 300)
data_df = pd.DataFrame({
"time": t_main,
"trend": 2*t_main + 0.5,
"input_1": 0.2 * np.abs(np.random.normal(0, 1, size=len(t_main)).cumsum())/2,
"input_2": -0.2 * np.cos(t_main/3 + 0.1)**2 * np.abs(np.random.normal(0, 1, size=len(t_main)).cumsum())/3,
"input_3": 0.3 * np.sin(t_main) * np.abs(np.random.normal(0, 1, size=len(t_main)).cumsum())/2,
"coeff": 0.1*savgol_filter(np.abs(np.random.normal(0, 1, size=len(t_main)).cumsum()), 60, 3)/2 + 0.1,
}).assign(**{
"output_1": lambda df: df["coeff"] * df["input_1"],
"output_2": lambda df: df["coeff"] * df["input_2"],
"output_3": lambda df: df["coeff"] * df["input_3"],
})
calc_df = pd.concat([
data_df.iloc[:int(3*len(data_df)//4)][["time", "trend", "input_1", "output_1"]].rename(columns={
"input_1": "input", "output_1": "output"
}),
data_df.iloc[int(len(data_df)//2):][["time", "trend", "input_2", "output_2"]].rename(columns={
"input_2": "input", "output_2": "output"
}),
data_df.iloc[int(len(data_df)//3):int(2*len(data_df)//3)][["time", "trend", "input_3", "output_3"]].rename(columns={
"input_3": "input", "output_3": "output"
})
], axis=0).assign(**{
"obs": lambda df: df["trend"] + df["output"]
}).sort_values("time").reset_index(drop=True)
obs_df = calc_df.drop(columns=["trend", "output"])
fig, axes = plt.subplots(2, 1, figsize=(7, 7), sharex=True)
axes[0].scatter(obs_df["time"], obs_df["obs"])
axes[0].set_title("Observation")
axes[1].plot(data_df["time"], data_df["coeff"])
axes[1].set_title("Time varying coeff")
plt.show()
Modeling
main_t = obs_df[["time"]].drop_duplicates().reset_index(drop=True).reset_index(drop=False)
indexed_obs_df = main_t.merge(obs_df, on="time")
main_t_coords = main_t["time"].to_numpy()
n_knots = 20
changepoints = np.linspace(main_t_coords.min(), main_t_coords.max(), n_knots + 2)[1:-1]
mask = (main_t_coords[:, None] > changepoints) * 1.0
with pm.Model() as model:
slope = pm.Normal("slope", mu=0, sigma=1)
intercept = pm.Normal("intercept", mu=0, sigma=1)
trend = pm.Deterministic("trend", var=(slope * main_t_coords + intercept))
k = pm.Normal("k", 0, 1)
m = pm.Normal("m", 0, 1)
delta = pm.Laplace("delta", 0, 0.1, shape=(len(changepoints), ))
growth = k + pt.dot(mask, delta)
offset = m + pt.dot(mask, -changepoints * delta)
coeff = pm.Deterministic("coeff", var=(growth * main_t_coords + offset))
dup_coord = indexed_obs_df["index"].values
dup_trend = trend[dup_coord]
dup_coeff = coeff[dup_coord]
dup_r_feat = pm.Deterministic("dup_r_feat", var=(dup_coeff * indexed_obs_df.input.values))
mu = pm.Deterministic("mu", var=(dup_trend + dup_r_feat))
# Likelihood
sigma = pm.HalfNormal("sigma", sigma=1)
pm.Normal("likelihood", mu=mu, sigma=sigma, observed=indexed_obs_df.obs)
display(pm.model_to_graphviz(model=model))
Diagnostic Plots
display(az.summary(idata, var_names=["slope", "intercept", "k", "m", "delta", "sigma"]))
az.plot_trace(
data=idata,
var_names=["slope", "intercept", "k", "m", "delta", "sigma"],
compact=True,
backend_kwargs={
"figsize": (12, 15),
"layout": "constrained"
},
);
plt.show()
sample_coeff = az.extract(data=idata, group="posterior", var_names="coeff").to_numpy()
min_coeff, max_coeff = np.quantile(sample_coeff, [0.05, 0.95], axis=1)
mean_coeff = sample_coeff.mean(axis=-1)
posterior_predictive_likelihood = az.extract(
data=ppc,
group="posterior_predictive",
var_names="likelihood",
)
out_df = pd.DataFrame({
"time": obs_df["time"],
"gt": obs_df["obs"],
"pred": posterior_predictive_likelihood.mean(["sample"])
})
fig, axes = plt.subplots(2, 1, figsize=(7, 7))
axes[0].scatter(data_df["time"], data_df["coeff"], marker=".", color="k")
axes[0].fill_between(data_df["time"], min_coeff, max_coeff, alpha=0.3)
axes[0].plot(data_df["time"], mean_coeff)
axes[0].set_title("Time Varying Coefficients")
axes[1].scatter(out_df["time"], out_df["gt"])
axes[1].scatter(out_df["time"], out_df["pred"])
axes[1].set_ylim(np.percentile(out_df["pred"], 0.01) - 0.5, np.percentile(out_df["pred"], 99.9) + 0.5)
axes[1].set_title("Output Predictions")
plt.show()