Hi everyone,
I am new to pymc and I have been able to manually implement a generalized univariate AR(1) log volatility model using the following code:
import pymc as pm
data = ...
# Model coordinates
coords = {
'dimension': range(1),
'lags': range(-self.AR_SV_lags, 0),
'steps': range(self.T - self.AR_SV_lags),
'T': range(self.T),
# Data column for which to estimate the model
eps_col = -1
# Define model
with pm.Model(coords=coords) as SV_model:
# Prior for constant term
mu_prior = pm.Normal('mu', mu=0, sigma=5, dims=('dimension',))
# Prior for AR coefficients
rho_prior = pm.Beta('rho', alpha=1, beta=1.5, dims=('lags',))
# Additive constant term prior
constant_prior = pm.Deterministic('constant', mu_prior *
(1 - pm.math.sum(rho_prior)))
# Prior for precision of the innovation term
sigma_prior = pm.Exponential('tau', lam=0.5, shape=1)
# Initial volatility for first time periods
h_init = pm.Exponential('h_init', lam=0.5, dims=('lags',))
h_init = h_init.dimshuffle('x', 0)
# Helper function to define the AR distribution
def ar_dist(h_init, constant, rho, sigma, size):
# Recursion
def step(h_lagged, constant, rho, tau):
# Time-t step
h_t = constant + rho * h_lagged + pm.Normal.dist(0, sigma=sigma)
return h_t, collect_default_updates([h_t])
# Use scan to iterate over the step function
h, _ =\
outputs_info=[{'initial': h_init,
'taps': range(-self.AR_SV_lags, 0)}],
non_sequences=[constant, rho, sigma],
n_steps=self.T - self.AR_SV_lags,
return h
# Distribution of logarithmic stochastic volatility
h = pm.CustomDist('ar_dist', h_init, constant_prior,
rho_prior, sigma_prior,
dist=ar_dist, dims=('steps',))
# Concatenate whole sequence of volatilities
h = pm.Deterministic('h', pytensor.tensor.concatenate(
[h_init, h], axis=0), dims=('T',))
# Exponentiate
volatility_process = pm.Deterministic(
'volatility_process', pm.math.exp(h/2))
# Compute structural shocks
if self.dist == 'StudentsT':
# Prior for the degrees of freedom
nu_prior = pm.Exponential('nu', lam=0.1)
# For student shocks
shocks = pm.StudentT('shocks', nu=nu_prior, mu=0,
sigma=volatility_process[:, 0],
observed=self.epsilon[:, eps_col])
# For normal shocks
shocks = pm.Normal('shocks', mu=0,
sigma=volatility_process[:, 0],
observed=self.epsilon[:, eps_col])
# Sample from posterior implied by SV_model using MCMC methods
with SV_model:
trace = pm.sample(1000, tune=2000)
posterior_predictive = pm.sample_posterior_predictive(trace)
where self.AR_SV_lags denotes the number of lags, self.T the length of the time series, self.dist the distribution to use (either Normal or StudentsT), and self.epsilon is a self.T x self.K array collecting self.K time series.
The code above runs well but I would like to extend it to the multivariate case where a vector of volatilities each depend on their own past values (but not the past values of the other volatilities) and the errors are correlated across equations. I tried the following code but keep getting rather cryptic errors regarding the scan implementation:
# Model coordinates
coords = {
'dimension': range(self.K),
'lags': range(-self.AR_SV_lags, 0),
'steps': range(self.T - self.AR_SV_lags),
'T': range(self.T),
# Define model
with pm.Model(coords=coords) as SV_model:
# Prior for constant term
mu_prior = pm.Normal('mu', mu=0, sigma=5, shape=(self.K,))
# Prior for AR coefficients
rho_prior = pm.Beta('rho', alpha=1, beta=1.5,
# Additive constant term prior
constant_prior = pm.Deterministic('constant', mu_prior *
(1 - pm.math.sum(rho_prior)))
# Prior for precision of the innovation term
sigma_prior = pm.Exponential('tau', lam=0.5, shape=(self.K, self.K))
# Initial volatility for first time periods
h_init = pm.Exponential('h_init', lam=0.5, shape=(self.AR_SV_lags, self.K))
# Helper function to define the AR distribution
def ar_dist(h_init, constant, rho, sigma, size):
# Recursion
def step(h_lagged, constant, rho, sigma):
# Time-t step
h_t = constant + (h_lagged * rho) +\
pm.Normal.dist(0, sigma=sigma, shape=(1, self.K))
h_t_squeeze = h_t[1]
return h_t_squeeze, collect_default_updates([h_t_squeeze])
# Use scan to iterate over the step function
h, _ =\
outputs_info=[{'initial': h_init,
'taps': range(-self.AR_SV_lags, 0)}],
non_sequences=[constant, rho, sigma],
n_steps=self.T - self.AR_SV_lags,
return h
# Distribution of logarithmic stochastic volatility
h = pm.CustomDist('ar_dist', h_init, constant_prior,
rho_prior, sigma_prior,
dist=ar_dist, dims=('steps', 'dimension'))
# Concatenate whole sequence of volatilities
h = pm.Deterministic('h', pytensor.tensor.concatenate(
[h_init.reshape((1, self.K)), h], axis=0), dims=('T',))
# Exponentiate
volatility_process = pm.Deterministic(
'volatility_process', pm.math.exp(h/2))
# Compute structural shocks
if self.dist == 'StudentsT':
# Prior for the degrees of freedom
nu_prior = pm.Exponential('nu', lam=0.1)
# For student shocks
shocks = pm.StudentT('shocks', nu=nu_prior, mu=np.zeros(self.K),
dims=('steps', 'dimension'))
# For normal shocks
shocks = pm.Normal('shocks', mu=np.zeros(self.K),
dims=('steps', 'dimension'))
# Sample from posterior implied by SV_model using MCMC methods
# nuts_sampler='nutpie',
with SV_model:
trace = pm.sample(1000, tune=2000)
posterior_predictive = pm.sample_posterior_predictive(trace)
The code above generates the following error:
File ~\AppData\Roaming\Python\Python311\site-packages\pymc\logprob\scan.py:309 in logprob_ScanRV
var_indices, rv_vars, io_vars = zip(*rv_outer_outs)
ValueError: not enough values to unpack (expected 3, got 0)
Any ideas on how to adapt the univariate code to the multivariate case? Thanks in advance!