Multivariate AR(1) Stochastic Volatility Model

Hi everyone,

I am new to pymc and I have been able to manually implement a generalized univariate AR(1) log volatility model using the following code:

import pymc as pm

data = ...

# Model coordinates
coords = {
    'dimension': range(1),
    'lags': range(-self.AR_SV_lags, 0),
    'steps': range(self.T - self.AR_SV_lags),
    'T': range(self.T),
}

# Data column for which to estimate the model
eps_col = -1

# Define model
with pm.Model(coords=coords) as SV_model:

        # Prior for constant term
        mu_prior = pm.Normal('mu', mu=0, sigma=5, dims=('dimension',))
        
        # Prior for AR coefficients
        rho_prior = pm.Beta('rho', alpha=1, beta=1.5, dims=('lags',))
        
        # Additive constant term prior
        constant_prior = pm.Deterministic('constant', mu_prior *
                                          (1 - pm.math.sum(rho_prior)))
        
        # Prior for precision of the innovation term
        sigma_prior = pm.Exponential('tau', lam=0.5, shape=1)
        
        # Initial volatility for first time periods
        h_init = pm.Exponential('h_init', lam=0.5, dims=('lags',))
        h_init = h_init.dimshuffle('x', 0)
        
        # Helper function to define the AR distribution
        def ar_dist(h_init, constant, rho, sigma, size):
        
            # Recursion
            def step(h_lagged, constant, rho, tau):
        
                # Time-t step
                h_t = constant + rho * h_lagged + pm.Normal.dist(0, sigma=sigma)
        
                return h_t, collect_default_updates([h_t])
        
            # Use scan to iterate over the step function
            h, _ =\
                pytensor.scan(fn=step,
                              outputs_info=[{'initial': h_init,
                                             'taps': range(-self.AR_SV_lags, 0)}],
                              non_sequences=[constant, rho, sigma],
                              n_steps=self.T - self.AR_SV_lags,
                              strict=True)
        
            return h
        
        # Distribution of logarithmic stochastic volatility
        h = pm.CustomDist('ar_dist', h_init, constant_prior,
                          rho_prior, sigma_prior,
                          dist=ar_dist, dims=('steps',))
        
        # Concatenate whole sequence of volatilities
        h = pm.Deterministic('h', pytensor.tensor.concatenate(
            [h_init, h], axis=0), dims=('T',))
        
        # Exponentiate
        volatility_process = pm.Deterministic(
            'volatility_process', pm.math.exp(h/2))
        
        # Compute structural shocks
        if self.dist == 'StudentsT':
        
            # Prior for the degrees of freedom
            nu_prior = pm.Exponential('nu', lam=0.1)
        
            # For student shocks
            shocks = pm.StudentT('shocks', nu=nu_prior, mu=0,
                                 sigma=volatility_process[:, 0],
                                 observed=self.epsilon[:, eps_col])
        
        else:
        
            # For normal shocks
            shocks = pm.Normal('shocks', mu=0,
                               sigma=volatility_process[:, 0],
                               observed=self.epsilon[:, eps_col])

# Sample from posterior implied by SV_model using MCMC methods
with SV_model:

        trace = pm.sample(1000, tune=2000)
        posterior_predictive = pm.sample_posterior_predictive(trace)

where self.AR_SV_lags denotes the number of lags, self.T the length of the time series, self.dist the distribution to use (either Normal or StudentsT), and self.epsilon is a self.T x self.K array collecting self.K time series.

The code above runs well but I would like to extend it to the multivariate case where a vector of volatilities each depend on their own past values (but not the past values of the other volatilities) and the errors are correlated across equations. I tried the following code but keep getting rather cryptic errors regarding the scan implementation:

# Model coordinates
coords = {
    'dimension': range(self.K),
    'lags': range(-self.AR_SV_lags, 0),
    'steps': range(self.T - self.AR_SV_lags),
    'T': range(self.T),
}

# Define model
with pm.Model(coords=coords) as SV_model:

    # Prior for constant term
    mu_prior = pm.Normal('mu', mu=0, sigma=5, shape=(self.K,))

    # Prior for AR coefficients
    rho_prior = pm.Beta('rho', alpha=1, beta=1.5,
                        shape=(self.K,))

    # Additive constant term prior
    constant_prior = pm.Deterministic('constant', mu_prior *
                                      (1 - pm.math.sum(rho_prior)))

    # Prior for precision of the innovation term
    sigma_prior = pm.Exponential('tau', lam=0.5, shape=(self.K, self.K))

    # Initial volatility for first time periods
    h_init = pm.Exponential('h_init', lam=0.5, shape=(self.AR_SV_lags, self.K))

    # Helper function to define the AR distribution
    def ar_dist(h_init, constant, rho, sigma, size):

        # Recursion
        def step(h_lagged, constant, rho, sigma):

            # Time-t step
            h_t = constant + (h_lagged * rho) +\
                pm.Normal.dist(0, sigma=sigma, shape=(1, self.K))
            h_t_squeeze = h_t[1]

            return h_t_squeeze, collect_default_updates([h_t_squeeze])

        # Use scan to iterate over the step function
        h, _ =\
            pytensor.scan(fn=step,
                          outputs_info=[{'initial': h_init,
                                         'taps': range(-self.AR_SV_lags, 0)}],
                          non_sequences=[constant, rho, sigma],
                          n_steps=self.T - self.AR_SV_lags,
                          strict=True)

        return h

    # Distribution of logarithmic stochastic volatility
    h = pm.CustomDist('ar_dist', h_init, constant_prior,
                      rho_prior, sigma_prior,
                      dist=ar_dist, dims=('steps', 'dimension'))

    # Concatenate whole sequence of volatilities
    h = pm.Deterministic('h', pytensor.tensor.concatenate(
        [h_init.reshape((1, self.K)), h], axis=0), dims=('T',))

    # Exponentiate
    volatility_process = pm.Deterministic(
        'volatility_process', pm.math.exp(h/2))

    # Compute structural shocks
    if self.dist == 'StudentsT':

        # Prior for the degrees of freedom
        nu_prior = pm.Exponential('nu', lam=0.1)

        # For student shocks
        shocks = pm.StudentT('shocks', nu=nu_prior, mu=np.zeros(self.K),
                              sigma=volatility_process,
                              observed=self.epsilon,
                              dims=('steps', 'dimension'))

    else:

        # For normal shocks
        shocks = pm.Normal('shocks', mu=np.zeros(self.K),
                            sigma=volatility_process,
                            observed=self.epsilon,
                            dims=('steps', 'dimension'))

# Sample from posterior implied by SV_model using MCMC methods
# nuts_sampler='nutpie',
with SV_model:

    trace = pm.sample(1000, tune=2000)
    posterior_predictive = pm.sample_posterior_predictive(trace)

The code above generates the following error:

File ~\AppData\Roaming\Python\Python311\site-packages\pymc\logprob\scan.py:309 in logprob_ScanRV
var_indices, rv_vars, io_vars = zip(*rv_outer_outs)

ValueError: not enough values to unpack (expected 3, got 0)

Any ideas on how to adapt the univariate code to the multivariate case? Thanks in advance!

Just to add that I already checked the (excellent) notebooks on implementing AR/MA models given at this address and which were very helpful in building the code I shared above for the univariate AR(1) stochastic volatility model. I also checked some of the test codes available here but none used scan to implement multivariate time series models.

I’m a bit confused by what you mean by multivariate in this context. Do you just want to fit several independent time series at the same time, e.g. by adding a batch dimension? The only wrinkle there is that the time dimension always has to be the left-most dimension in the scan, which is confusing since that’s usually where we think of the batch dimension going. Here’s an example:

import pytensor
import pytensor.tensor as pt
import pymc as pm
from pymc.pytensorf import collect_default_updates
import arviz as az

lags = 2  # Number of lags
trials = 100  # Time series length
n_timeseries = 5

def ar_dist(ar_init, rho, sigma, size):
    def ar_step(x_tm2, x_tm1, rho, sigma):
        mu = x_tm1 * rho[0] + x_tm2 * rho[1]
        x = mu + pm.Normal.dist(sigma=sigma)
        return x, collect_default_updates([x])

    ar_innov, _ = pytensor.scan(
        fn=ar_step,
        outputs_info=[{"initial": ar_init, "taps": range(-lags, 0)}],
        non_sequences=[rho, sigma],
        n_steps=trials - lags,
        strict=True,
    )

    return ar_innov

coords = {
    "lags": range(-lags, 0),
    "steps": range(trials - lags),
    "trials": range(trials),
    "batch": range(n_timeseries)
}
with pm.Model(coords=coords, check_bounds=False) as batch_model:
    rho = pm.Normal(name="rho", mu=0, sigma=0.2, dims=("lags", 'batch'))
    sigma = pm.HalfNormal(name="sigma", sigma=0.2, dims=('batch'))

    ar_init = pm.Normal(name="ar_init", dims=("lags", 'batch'))

    ar_innov = pm.CustomDist(
        "ar_dist",
        ar_init,
        rho,
        sigma,
        dist=ar_dist,
        dims=("steps", 'batch'),
    )

    ar = pm.Deterministic(
        name="ar", var=pt.concatenate([ar_init, ar_innov], axis=0), dims=('trials', "batch")
    )

With a parameter recovery exercise:

import matplotlib.pyplot as plt

with batch_model:
    prior_idata = pm.sample_prior_predictive()
    
test_data = prior_idata.prior.ar_dist.sel(chain=0, draw=0).values
with pm.observe(batch_model, {'ar_dist': test_data}):
    idata = pm.sample()
    
true_rhos = prior_idata.prior.rho.sel(chain=0, draw=0).values
az.plot_posterior(idata, var_names=['rho'], coords={'lags':-1}, ref_val=true_rhos[1].tolist(), grid=(1,5));

One nitpick on nomenclature. Passing the volatility process to a distribution via the sigma parameter doesn’t make shocks, it makes a kind of time-varying measurement error model. Your shocks are already defined in the scan model by the pm.Normal.dist term, which you could change to be Student T if you wanted fat-tailed innovations. I’m not sure these terms are needed at all.

1 Like

Hi Jesse,

Thank you for your answer that was very helpful!

To clarify, I want to model “independent” AR(1) equations for latent volatilities with correlated latent errors, hence why I mentioned in my post that each variance only depends on its own lags and not the lags of the other variances, but why the model still is multivariate. Therefore, I think I would need to amend the line for sigma which should be a batch times batch matrix (since the latent errors across equations are allowed to be correlated).

For the nomenclature: point taken! To provide some additional context, I am actually modelling the conditional variances of the structural shocks of a structural vector autoregression, where I recovered the said structural shocks using a predefined identification strategy. Hence why I refer to the errors in the AR(1) equations (the ones generated by pm.Normal.dist(sigma=sigma)) as “latent errors” while the final variable I recover is named “shocks” in reference to the structural shocks, I hope this clarifies!