Hierarchical HMM with dependent transition matrices

I would like to implement a hierarchical HMM with the following structure.

  • There are two layers of hidden states—n_i and v_i—and one layer of observed emissions V_i.
  • There are two possible values for the n_i, two possible values for the q_i, and two possible values for the V_i.
  • There is a time-independent transition matrix T_n governing the hidden states n_i .
  • The values of n_i govern a switching process that selects one of two possible transition matrices—T_q,0 or T_q,1 (T_q,n_i)— governing the hidden states q_i.
  • There is a time-independent emission matrix E that governs the emitted values V_i.

Is it possible to specify such a model in PyMC?

My latest attempt is based off of threads like Adding Conditional Transition Probabilities to a Hidden Markov Model - #4 by jessegrabowski , which define a CustomDist(…, dist=markov_chain) and a markov_chain() function that calls a scan routine. This attempt tries to handle both the n and the q layer within the same scan routine.

with pm.Model() as model:
    def step(P_n_stack, P_nq_stack,
                  x_tm1):
        """
        step(sequences,
             outputs_info,
             non_sequences)
        """
        x_tm1_n, x_tm1_q = x_tm1
        x_t_n = pm.Categorical.dist(p=P_n_stack[0,x_tm1_n])
        x_t_q = pm.Categorical.dist(p=P_nq_stack[0,x_tm1_n,x_tm1_q])
        x_t = pt.tensor.as_tensor([x_t_n, x_t_q])
        return x_t, collect_default_updates(x_t)
    
    def markov_chain(x0, P_n_stack, P_nq_stack, shape=None):
        states, _ = pt.scan(fn=step,
                            sequences=[P_n_stack, P_nq_stack],
                            outputs_info=[x0],
                            non_sequences=None)
        return states
    
    # T_n_stack is shape (steps, num_states_n, num_states_n) = (steps, 2, 2).
    # It is made by repeating the `T_n` matrix `steps` times.
    # T_nq_stack is shape (steps, num_states_n, num_states_q, num_states_q) = (steps, 2, 2, 2).
    # It is made by concatenating the `T_q,0` and `T_q,1` matrices and then repeating that concatenation `steps` times.
    x0_n = pm.Categorical.dist(p=pi_n.squeeze())
    x0_q = pm.Categorical.dist(p=pi_q.squeeze())
    x0_nq = pt.tensor.as_tensor([x0_n, x0_q])
    xi_nq = pm.CustomDist(
        "xi_nq",
        x0_nq,
        T_n_stack,
        T_nq_stack,
        dist=markov_chain
    )

  # Emissions and likelihood function not shown

I have another attempt in which I separately define a CustomDist for the n Markov chain and q Markov chain, and use the outputs of the n Markov chain to select the series of T_q,n_i matrices that are used in the q Markov chain.

with pm.Model() as model:
    def step(P, 
             x_tm1):
        """
        step(sequences,
             outputs_info,
             non_sequences)
        """
        x_t = pm.Categorical.dist(p=P[x_tm1])
        return x_t, collect_default_updates(x_t)
    
    def markov_chain(x0, Ps, shape=None):
        states, _ = pt.scan(fn=step,
                            sequences=[Ps],
                            outputs_info=[x0],
                            non_sequences=None)
        return states
    
    # T_n_stack is shape (steps, num_states_n, num_states_n) = (steps, 2, 2).
    # It is made by repeating the `T_n` matrix `steps` times.
    x0_n = pm.Categorical.dist(p=pi_n.squeeze())
    xi_n = pm.CustomDist(
        "xi_n",
        x0_n,
        T_n_stack,
        dist=markov_chain
    )
    
    # T_nq_stack is shape (steps, num_states_n, num_states_q, num_states_q) = (steps, 2, 2, 2).
    # It is made by concatenating the `T_q,0` and `T_q,1` matrices and then repeating that concatenation `steps` times.
    T_q_stack = pt.tensor.as_tensor(T_nq_stack[:,xi_n,:,:])
    T_q_stack = pt.tensor.reshape(T_q_stack, (n_steps,2,2)) # Need to drop the size-1 dimension
    x0_q = pm.Categorical.dist(p=pi_q.squeeze())
    xi_q = pm.CustomDist(
        "xi_q",
        x0_q,
        T_q_stack,
        dist=markov_chain
    )

    # Emissions and likelihood function not shown

Both of the above attempts raise the following error when sampled.

ValueError: Random variables detected in the logp graph: {categorical_rv{"(p)->()"}.out, categorical_rv{"(p)->()"}.out}.
This can happen when mixing variables from different models, or when CustomDist logp or Interval transform functions reference nonlocal variables.
1 Like

If the two possible n state and two possible q states were to be joined together into four possible joint states 0:(0,0), 2:(0,1), 3:(1,0), 4:(1,1), then I think the model I am seeking to implement is similar to that of Fig 6a of An introduction to hidden Markov models | IEEE Journals & Magazine | IEEE Xplore. However, I am specifically trying to set up the PyMC model hierarchically.

I am able to get past the ValueError: Random variables detected in the logp graph error by replacing the calls to unnamed distributions pm.Categorical.dist() with hard-coded integers. I believe I also fixed an indexing issue for the creation of T_q_stack , in the “separately define a CustomDist for the n Markov chain and q Markov chain” attempt. Here is the latest code.

with pm.Model() as model:
    def step(P, 
             x_tm1):
        """
        step(sequences,
             outputs_info,
             non_sequences)
        """
        x_t = pm.Categorical.dist(p=P[x_tm1])
        return x_t, collect_default_updates(x_t)
    
    def markov_chain(x0, Ps, shape=None):
        states, _ = pt.scan(fn=step,
                            sequences=[Ps],
                            outputs_info=[x0],
                            non_sequences=None)
        return states
    
    # T_n_stack is shape (num_steps, num_states_n, num_states_n) = (51, 2, 2).
    # It is made by repeating the `T_n` matrix `steps` times.
    x0_n = np.int64(0) # pm.Categorical.dist(p=pi_n.squeeze())
    xi_n = pm.CustomDist(
        "xi_n",
        x0_n,
        T_n_stack,
        dist=markov_chain
    )
    
    # T_nq_stack is shape (num_steps, num_states_n, num_states_q, num_states_q) = (51, 2, 2, 2).
    # It is made by concatenating the `T_q,0` and `T_q,1` matrices and then repeating that concatenation `steps` times.
    num_steps = T_nq_stack.shape[0]
    T_q_stack = T_nq_stack[pt.tensor.arange(num_steps), xi_n, :, :]
    x0_q = np.int64(1) # pm.Categorical.dist(p=pi_q.squeeze())
    xi_q = pm.CustomDist(
        "xi_q",
        x0_q,
        T_q_stack,
        dist=markov_chain
    )

    # Emissions and likelihood function not shown

With this code, I am able to call pm.sample, which begins to run, but then hits the following error.

ParallelSamplingError: Chain 0 failed with: index out of bounds
Apply node that caused the error: Subtensor{i}(*1-<Matrix(float64, shape=(2, 2))>, ScalarFromTensor.0)
Toposort index: 4
Inputs types: [TensorType(float64, shape=(2, 2)), ScalarType(int64)]
Inputs shapes: [(2, 2), ()]
Inputs strides: [(16, 8), ()]
Inputs values: [array([[9.90098030e-01, 9.90197049e-03],
       [1.00999899e-04, 9.99899000e-01]]), np.int64(2)]
Outputs clients: [[Le(Subtensor{i}.0, [1]), Ge(Subtensor{i}.0, [0]), Sum{axes=None}(Subtensor{i}.0), AdvancedSubtensor1(Subtensor{i}.0, Clip.0)]]

(Also, note a typo in the initial post’s sketch of the DAG—the bottom row’s label should read “Observed emissions V_i in {0,1}”)

Is there a way you could write a graphical model out where the nodes represent random variables such that each variable is conditionally independent given the nodes that point into it? Then it’ll be easy to see what the notation means in terms of defining a likelihood and where it fits into the HMM literature, which is vast.

Alternatively, is there a way to combine the hidden states n and q into a single regular HMM? Then you can apply all the usual HMM tricks like the forward algorithm for the likelihood or forward-backward to fully marginalize for even more efficient training. I don’t know what the state of that is in PyMC, but if there’s an HMM likelihood then it should be able to at least use the forward algorithm to efficiently calculate likelihoods.

Thanks for your reply, @bob-carpenter.

Alternatively, is there a way to combine the hidden states n and q into a single regular HMM? Then you can apply all the usual HMM tricks like the forward algorithm for the likelihood or forward-backward to fully marginalize for even more efficient training. I don’t know what the state of that is in PyMC, but if there’s an HMM likelihood then it should be able to at least use the forward algorithm to efficiently calculate likelihoods.

If the two possible n state and two possible q states were to be joined together into four possible joint states 0:(0,0), 2:(0,1), 3:(1,0), 4:(1,1) , then the model I am seeking to implement is similar to that of Fig 6a of the IEEE paper linked above.

I have successfuly implemented this “joint state” model in Pomegranate, which offers convenient access to the forward-backward algorithm. However, I am ultimately working toward a larger and more complex model for which a more expressive language like PyMC or NumPyro seemed better suited and easier to scale. As an example, the single transition matrix of the “joint state” model has 4x4=16 elements, but in my case those elements aren’t all independent, so PyMC could help me pool some of those elements and reduce the number of parameters. That is what I was trying to accomplish by splitting the 4x4 matrix into a two-layer hierarchical model which still encodes 16 possible transitions, but with fewer parameters.

Is there a way you could write a graphical model out where the nodes represent random variables such that each variable is conditionally independent given the nodes that point into it? Then it’ll be easy to see what the notation means in terms of defining a likelihood and where it fits into the HMM literature, which is vast.

I hope the following diagram can help to answer your question. To the right of the diagram, I’ve sketched how each n_i and q_i node is distributed depending on the previous nodes that point into them. For q_i I tried mocking up a “switch distribution” (relevant link: Combining models using switch - #2 by ricardoV94). I’ve also included a likelihood based on the emissions V_i.

I appreciate your help!

You don’t need switch here, you can replace pm.math.switch(n[i-1], T[q, 1], T[q, 0]) with direct indexing, T[q, n[i-1]].

The new diagram helps. So the n is just a Markov sequence like in a standard HMM. The qs are conditional on the n, but not in a standard HMM way in that they also depend on what came before. How is q_0 generated? I’m a little unclear on the V_i distribution—what is E[q_i] and what is \textsf{observed}. I was confused by the V_i being labeled as “observed emissions” and then \textsf{observed} actually showing up.

You can use the same parameterization you have now and generate the 4 x 4 transition matrix if you patch up the initialization of q_0 (if you have a probability distribution you can marginalize out). Then it’ll plug into standard HMM likelihood formulations, which may make your life easier—I don’t know how much HMM functionality is built into PyMC or how easy it is to define the forward algorithm—you definitely do not want to be doing discrete sampling here following the generative model (there’s usually way too much correlation among states).

Thanks again, @bob-carpenter.

How is q_0 generated?

q_0 is known deterministically. That is, in the experiment that generates the data, I get to choose if q_0 is either 0 or 1.

I’m a little unclear on the V_i distribution—what is E[q_i] and what is observed. I was confused by the V_i being labeled as “observed emissions” and then observed actually showing up.

Apologies for the inconsistent notation. E is a 2x2 emission matrix. For instance, E = [[.05,.95],[.95,.05]] such that if q_i=1, then we sample V_i ~ pm.Categorical.dist(p=E[q_i]) = pm.Categorical.dist(p=[.95,.05]). And then that sampled value of V_i gets compared against the observed data X_i (the observed data has values of either 0 or 1).

… you definitely do not want to be doing discrete sampling here following the generative model (there’s usually way too much correlation among states).

OK, perhaps I’ll drop the markov_chain() sampling and implement a forward-algorithm likelihood following Example: Hidden Markov Model — NumPyro documentation or How to marginalized Hidden Markov Model with categorical? - #9 by junpenglao (which links to Planet_Sakaar_Data_Science/PyMC_QnA/discourse_2230.ipynb at main · junpenglao/Planet_Sakaar_Data_Science · GitHub).

If you can write your model using DiscreteMarkovChain with a single lag, you can then use pm.marginalize to get the forward algorithm without the discrete sampling.

import numpy as np
import pytensor.tensor as pt
import pymc as pm
from pymc_extras import marginalize
from pymc_extras.distributions import DiscreteMarkovChain

rng = np.random.default_rng(sum(map(ord, "HMM")))

with pm.Model() as m:
    P = pm.Dirichlet("P", a=[1, 1], shape=(2, 2))
    init_dist = pm.Categorical.dist(p=[0.375, 0.625])
    markov_chain = DiscreteMarkovChain("markov_chain", P=P, init_dist=init_dist, steps=40)
    emission = pm.Categorical("emission", p=pt.constant([[0.8, 0.2], [0.4, 0.6]])[markov_chain])
    # or equivalent
    # emission = pm.Bernoulli("emission", p=pt.where(pt.eq(markov_chain, 0), 0.2, 0.6))

with pm.do(m, {"P": [[0.5, 0.5], [0.3, 0.7]]}):
    data = pm.sample_prior_predictive(1, random_seed=rng).prior.squeeze()

print("chain[:20]     : ", data["markov_chain"].values[:20])
print("emmission[:20] : ", data["emission"].values[:20])

observed_m = pm.observe(m, {"emission": data["emission"]})

with marginalize(observed_m, ["markov_chain"]):
    idata = pm.sample(random_seed=rng)

pm.stats.summary(idata)

If your init_dist is known you can use pm.DiracDelta. Either way it’s marginalized if you use this approach.

Colab notebook: Google Colab

1 Like