Also it’s recommended to avoid using the model.register_rv
API, since it’s not guaranteed to be maintained going forward. The new way is to use pm.CustomDist
, passing a function that returns the results of your scan as the dist
. For your model, it would look like this:
from pymc.pytensorf import collect_default_updates, get_mode
def step(*args):
y_tm1, l_tm1, b_tm1, alpha, beta, phi, sigma = args
l = alpha * y_tm1 + (1 - alpha) * (l_tm1 + phi * b_tm1) # update the level
b = beta * (l - l_tm1) + (1 - beta) * phi * b_tm1 # update the trend
mu = l + phi * b # forecast
y = pm.Normal.dist(mu=mu, sigma=sigma)
return (l, b, y), collect_default_updates(inputs=args, outputs=[y])
def holt_winters(alpha, beta, phi, sigma, l0, b0, size, mode='JAX'):
(l, b, y), updates = pytensor.scan(fn=step,
sequences=[y_obs],
outputs_info=[l0, b0, None],
non_sequences=[alpha, beta, phi, sigma],
strict=True,
n_steps=size[0],
mode=get_mode(mode))
return y
with pm.Model() as model:
# Observed Data
y_obs = pm.MutableData('y_obs', Y)
alpha = pm.Uniform('alpha', 0, 1)
beta = pm.Uniform('beta', 0, 1)
phi = pm.Uniform('phi', 0.8, 0.98)
sigma = pm.HalfNormal('sigma', 0.1) # variance
l0 = pm.Normal('l0', 0, 0.1) # initial level
b0 = pm.Normal('b0', 0, 0.1) # initial trend
y_pred = pm.CustomDist('obs', alpha, beta, phi, sigma, l0, b0, dist=holt_winters, observed=y_obs)
idata = pm.sample(nuts_sampler='numpyro', nuts_sampler_kwargs={'chain_method':'vectorized'})