# Multivariate AR(1) Stochastic Volatility Model

Hi everyone,

I am new to pymc and I have been able to manually implement a generalized univariate AR(1) log volatility model using the following code:

``````import pymc as pm

data = ...

# Model coordinates
coords = {
'dimension': range(1),
'lags': range(-self.AR_SV_lags, 0),
'steps': range(self.T - self.AR_SV_lags),
'T': range(self.T),
}

# Data column for which to estimate the model
eps_col = -1

# Define model
with pm.Model(coords=coords) as SV_model:

# Prior for constant term
mu_prior = pm.Normal('mu', mu=0, sigma=5, dims=('dimension',))

# Prior for AR coefficients
rho_prior = pm.Beta('rho', alpha=1, beta=1.5, dims=('lags',))

constant_prior = pm.Deterministic('constant', mu_prior *
(1 - pm.math.sum(rho_prior)))

# Prior for precision of the innovation term
sigma_prior = pm.Exponential('tau', lam=0.5, shape=1)

# Initial volatility for first time periods
h_init = pm.Exponential('h_init', lam=0.5, dims=('lags',))
h_init = h_init.dimshuffle('x', 0)

# Helper function to define the AR distribution
def ar_dist(h_init, constant, rho, sigma, size):

# Recursion
def step(h_lagged, constant, rho, tau):

# Time-t step
h_t = constant + rho * h_lagged + pm.Normal.dist(0, sigma=sigma)

# Use scan to iterate over the step function
h, _ =\
pytensor.scan(fn=step,
outputs_info=[{'initial': h_init,
'taps': range(-self.AR_SV_lags, 0)}],
non_sequences=[constant, rho, sigma],
n_steps=self.T - self.AR_SV_lags,
strict=True)

return h

# Distribution of logarithmic stochastic volatility
h = pm.CustomDist('ar_dist', h_init, constant_prior,
rho_prior, sigma_prior,
dist=ar_dist, dims=('steps',))

# Concatenate whole sequence of volatilities
h = pm.Deterministic('h', pytensor.tensor.concatenate(
[h_init, h], axis=0), dims=('T',))

# Exponentiate
volatility_process = pm.Deterministic(
'volatility_process', pm.math.exp(h/2))

# Compute structural shocks
if self.dist == 'StudentsT':

# Prior for the degrees of freedom
nu_prior = pm.Exponential('nu', lam=0.1)

# For student shocks
shocks = pm.StudentT('shocks', nu=nu_prior, mu=0,
sigma=volatility_process[:, 0],
observed=self.epsilon[:, eps_col])

else:

# For normal shocks
shocks = pm.Normal('shocks', mu=0,
sigma=volatility_process[:, 0],
observed=self.epsilon[:, eps_col])

# Sample from posterior implied by SV_model using MCMC methods
with SV_model:

trace = pm.sample(1000, tune=2000)
posterior_predictive = pm.sample_posterior_predictive(trace)
``````

where self.AR_SV_lags denotes the number of lags, self.T the length of the time series, self.dist the distribution to use (either Normal or StudentsT), and self.epsilon is a self.T x self.K array collecting self.K time series.

The code above runs well but I would like to extend it to the multivariate case where a vector of volatilities each depend on their own past values (but not the past values of the other volatilities) and the errors are correlated across equations. I tried the following code but keep getting rather cryptic errors regarding the scan implementation:

``````# Model coordinates
coords = {
'dimension': range(self.K),
'lags': range(-self.AR_SV_lags, 0),
'steps': range(self.T - self.AR_SV_lags),
'T': range(self.T),
}

# Define model
with pm.Model(coords=coords) as SV_model:

# Prior for constant term
mu_prior = pm.Normal('mu', mu=0, sigma=5, shape=(self.K,))

# Prior for AR coefficients
rho_prior = pm.Beta('rho', alpha=1, beta=1.5,
shape=(self.K,))

constant_prior = pm.Deterministic('constant', mu_prior *
(1 - pm.math.sum(rho_prior)))

# Prior for precision of the innovation term
sigma_prior = pm.Exponential('tau', lam=0.5, shape=(self.K, self.K))

# Initial volatility for first time periods
h_init = pm.Exponential('h_init', lam=0.5, shape=(self.AR_SV_lags, self.K))

# Helper function to define the AR distribution
def ar_dist(h_init, constant, rho, sigma, size):

# Recursion
def step(h_lagged, constant, rho, sigma):

# Time-t step
h_t = constant + (h_lagged * rho) +\
pm.Normal.dist(0, sigma=sigma, shape=(1, self.K))
h_t_squeeze = h_t[1]

# Use scan to iterate over the step function
h, _ =\
pytensor.scan(fn=step,
outputs_info=[{'initial': h_init,
'taps': range(-self.AR_SV_lags, 0)}],
non_sequences=[constant, rho, sigma],
n_steps=self.T - self.AR_SV_lags,
strict=True)

return h

# Distribution of logarithmic stochastic volatility
h = pm.CustomDist('ar_dist', h_init, constant_prior,
rho_prior, sigma_prior,
dist=ar_dist, dims=('steps', 'dimension'))

# Concatenate whole sequence of volatilities
h = pm.Deterministic('h', pytensor.tensor.concatenate(
[h_init.reshape((1, self.K)), h], axis=0), dims=('T',))

# Exponentiate
volatility_process = pm.Deterministic(
'volatility_process', pm.math.exp(h/2))

# Compute structural shocks
if self.dist == 'StudentsT':

# Prior for the degrees of freedom
nu_prior = pm.Exponential('nu', lam=0.1)

# For student shocks
shocks = pm.StudentT('shocks', nu=nu_prior, mu=np.zeros(self.K),
sigma=volatility_process,
observed=self.epsilon,
dims=('steps', 'dimension'))

else:

# For normal shocks
shocks = pm.Normal('shocks', mu=np.zeros(self.K),
sigma=volatility_process,
observed=self.epsilon,
dims=('steps', 'dimension'))

# Sample from posterior implied by SV_model using MCMC methods
# nuts_sampler='nutpie',
with SV_model:

trace = pm.sample(1000, tune=2000)
posterior_predictive = pm.sample_posterior_predictive(trace)
``````

The code above generates the following error:

File ~\AppData\Roaming\Python\Python311\site-packages\pymc\logprob\scan.py:309 in logprob_ScanRV
var_indices, rv_vars, io_vars = zip(*rv_outer_outs)

ValueError: not enough values to unpack (expected 3, got 0)

Any ideas on how to adapt the univariate code to the multivariate case? Thanks in advance!

Just to add that I already checked the (excellent) notebooks on implementing AR/MA models given at this address and which were very helpful in building the code I shared above for the univariate AR(1) stochastic volatility model. I also checked some of the test codes available here but none used scan to implement multivariate time series models.

Iâ€™m a bit confused by what you mean by multivariate in this context. Do you just want to fit several independent time series at the same time, e.g. by adding a batch dimension? The only wrinkle there is that the time dimension always has to be the left-most dimension in the scan, which is confusing since thatâ€™s usually where we think of the batch dimension going. Hereâ€™s an example:

``````import pytensor
import pytensor.tensor as pt
import pymc as pm
import arviz as az

lags = 2  # Number of lags
trials = 100  # Time series length
n_timeseries = 5

def ar_dist(ar_init, rho, sigma, size):
def ar_step(x_tm2, x_tm1, rho, sigma):
mu = x_tm1 * rho[0] + x_tm2 * rho[1]
x = mu + pm.Normal.dist(sigma=sigma)

ar_innov, _ = pytensor.scan(
fn=ar_step,
outputs_info=[{"initial": ar_init, "taps": range(-lags, 0)}],
non_sequences=[rho, sigma],
n_steps=trials - lags,
strict=True,
)

return ar_innov

coords = {
"lags": range(-lags, 0),
"steps": range(trials - lags),
"trials": range(trials),
"batch": range(n_timeseries)
}
with pm.Model(coords=coords, check_bounds=False) as batch_model:
rho = pm.Normal(name="rho", mu=0, sigma=0.2, dims=("lags", 'batch'))
sigma = pm.HalfNormal(name="sigma", sigma=0.2, dims=('batch'))

ar_init = pm.Normal(name="ar_init", dims=("lags", 'batch'))

ar_innov = pm.CustomDist(
"ar_dist",
ar_init,
rho,
sigma,
dist=ar_dist,
dims=("steps", 'batch'),
)

ar = pm.Deterministic(
name="ar", var=pt.concatenate([ar_init, ar_innov], axis=0), dims=('trials', "batch")
)
``````

With a parameter recovery exercise:

``````import matplotlib.pyplot as plt

with batch_model:
prior_idata = pm.sample_prior_predictive()

test_data = prior_idata.prior.ar_dist.sel(chain=0, draw=0).values
with pm.observe(batch_model, {'ar_dist': test_data}):
idata = pm.sample()

true_rhos = prior_idata.prior.rho.sel(chain=0, draw=0).values
az.plot_posterior(idata, var_names=['rho'], coords={'lags':-1}, ref_val=true_rhos[1].tolist(), grid=(1,5));
``````

One nitpick on nomenclature. Passing the volatility process to a distribution via the `sigma` parameter doesnâ€™t make shocks, it makes a kind of time-varying measurement error model. Your shocks are already defined in the scan model by the `pm.Normal.dist` term, which you could change to be Student T if you wanted fat-tailed innovations. Iâ€™m not sure these terms are needed at all.

1 Like

Hi Jesse,