Problem using emissions as likelihood for hidden Markov model

Hi
I’m hoping someone can help me with a problem that’s slowly robbing me of my sanity. I’m trying to instantiate a hidden Markov model to model participant responses in an experiment, and am particularly interested in inferring their transition probabilities. This is the code of my model:

import pymc as pm
import pytensor
import pytensor.tensor as pt
from pymc.pytensorf import collect_default_updates
import numpy as np
import arviz as az


coords={'time':np.arange(n_timesteps), 
        'state':[0, 1], 
        'next_state':[0, 1]}

with pm.Model(coords=coords) as m:    
    def step(P, x_tm1):
        x_t = pm.Categorical.dist(p=P[x_tm1])
        return x_t, collect_default_updates(x_t)
    
    def markov_chain(x0, Ps, shape=None):
        states, _ = pytensor.scan(step,
                                  outputs_info=[x0],
                                  sequences=[Ps])        
        return states

    transition_matrix_baseline = pm.Dirichlet('transition_matrix_baseline', a=[1, 1], dims=['state', 'next_state'])
    transition_matrix_explicit = pm.Dirichlet('transition_matrix_explicit', a=[1, 1], dims=['state', 'next_state'])
    
    transition_matricies = pt.stack([transition_matrix_baseline] * n_timesteps)
    transition_matricies = transition_matricies[explicit_transition_indicies].set(transition_matrix_explicit)
    transition_matricies = pm.Deterministic('Ps', transition_matricies, dims=['time', 'state', 'next_state'])
    
    p_x0 = 0.1 if initial_state == "a" else 0.9
    x0 = pm.Bernoulli('x0', p=p_x0)
    hidden_states = pm.CustomDist('hidden_states', x0, transition_matricies, dist=markov_chain, dims='time')

    emission_ps = pm.math.stack([[0.15, 0.85], [0.85, 0.15]])
    emission_matricies = pm.math.stack([emission_ps] * n_timesteps)
    emissions = pm.Categorical(
        "emissions", p=emission_matricies[0, hidden_states], dims=["time"]
    )

    # Specify known states
    known_indices = np.where(~np.isnan(known_states))[0]
    is_correct_state = pt.eq(hidden_states[known_indices], known_states[known_indices])
    logp = pt.sum(pt.switch(is_correct_state, -0.02, -3.91))
    pm.Potential("hidden_states_observed", logp)


with pm.observe(m, {"emissions": cue_concordant_with_a.values}):
    idata = pm.sample(chains=1)

My problem has come with the addition of the emission probabilities. When I run the model as seen here, I get IndexError: index out of bounds. I don’t get the error if I change the observed variable from 'emissions' to 'hidden_states' though. I’ve tried everything I can think of, and scoured the discourse, but I can’t figure out what is going wrong. I’ll be eternally grateful if anyone can tell me what’s going on here.

PyMC is probably using a default metropolis sampler for the hidden states that doesn’t know it has a bounded domain and can propose out of bounds values.

You can clip the values to avoid this by doing something like hidden_states = pm.math.clip(hiddent_states, 0, 1)

If I read correctly that you have two possible states

Otherwise you could assign the variable to a Bernoulli step sampler, but the sampler may be too strict and only allow Bernoulli variables. We should relax that

Hi Ricardo
Thanks for such a quick reply. Unfortunately, adding the line hidden_states = pm.math.clip(hidden_states, 0, 1) after the line declaring the hidden_states CustomDist didn’t fix the error.

In terms of trying Bernoulli step sampler, I can’t find reference to one in the API. How do I go about changing the hidden_states sampler from the current (Metropolis) one to that?

Thanks for your help

Never mind. I used my brain and figured you must have meant BinaryMetropolis. I’ve changed the sampler for hidden_states to that and it’s sampling beautifully. Thank you :grin:.

2 Likes

Hi Ricardo. One more question. I’ve got the model working for individual experiment participants now, but would like to make it hierarchical. For reference, here’s the code for my individual model.

coords = {
    "participant": participants,
    "time": np.arange(n_timesteps),
    "state": ["xt=a", "xt=b"],
    "next_state": ["xt+1=a", "xt+1=b"],
    "emission": ["emission_concordant_with_a", "emission_concordant_with_b"],
}

with pm.Model(coords=coords) as model:

    def step(P, x_tm1):
        x_t = pm.Categorical.dist(p=P[x_tm1])
        return x_t, collect_default_updates(x_t)

    def markov_chain(x0, Ps, shape=None):
        states, _ = pytensor.scan(step, outputs_info=[x0], sequences=[Ps])

        return states

    # Set transition matricies with informative priors
    transition_matrix_baseline = pm.Dirichlet(
        "transition_matrix_baseline",
        a=[[5, 1.2], [1.2, 5]],
        dims=["participant", "state", "next_state"],
    )
    transition_matrix_explicit = pm.Dirichlet(
        "transition_matrix_explicit",
        a=[[1.2, 5], [5, 1.2]],
        dims=["participant", "state", "next_state"],
    )

    transition_matricies = pt.stack(
        [transition_matrix_baseline] * n_timesteps
    ).dimshuffle(1, 0, 2, 3)

    for participant_index, explicit_transition_indicies in enumerate(
        model_data["explicit_transition_indicies"]
    ):
        transition_matricies = transition_matricies[
            participant_index, explicit_transition_indicies, :, :
        ].set(transition_matrix_explicit[participant_index, :, :])

    transition_matricies = pm.Deterministic(
        "transition_matricies",
        transition_matricies,
        dims=["participant", "time", "state", "next_state"],
    )

    p_x0s = pm.Data(
        "initial_state",
        [0.1 if x == "a" else 0.9 for x in model_data["initial_state"]],
        dims="participant",
    )
    x0 = pm.Bernoulli("x0", p=p_x0s, dims="participant")
    hidden_states = pm.CustomDist(
        "hidden_states",
        x0,
        transition_matricies,
        dist=markov_chain,
        dims=["participant", "time"],
    )

    observed_emissions = pm.Data(
        "obsereved_emissions",
        model_data["cue_concordant_with_a"],
        dims=["participant", "time"],
    )

    emission_ps = pm.Dirichlet(
        "p_emission_concordant_with_true_state",
        a=[[3, 1], [1, 3]],
        dims=["participant", "state", "emission"],
    )
    emission_matricies = pm.math.stack([emission_ps] * n_timesteps)
    emissions = pm.Categorical(
        "emissions",
        p=emission_matricies[0, hidden_states],
        dims=["participant", "time"],
        observed=observed_emissions,
    )

    # Specify known states
    known_indices = np.where(~np.isnan(known_states))[0]
    is_correct_state = pt.eq(hidden_states[known_indices], known_states[known_indices])
    logp = pt.sum(pt.switch(is_correct_state, -0.02, -3.91))
    pm.Potential("hidden_states_observed", logp)

To convert it to a hierarchical model, I’ve added the extra dimension, participant to the distributions and data, but I’m confused about how to adapt my scan function to cope with it. I’ve looked at some similar questions on the discourse, but none quite answer this question. Below is the code for the hierarchical model which (hopefully) only needs the scan function adapted to work.

model_data = { #Each value is a list of arrays
    "participants": participants,
    "explicit_transition_indicies": explicit_transition_index_arrays,
    "initial_states": initial_states_array,
    "length": length_arrays,
    "known_states": known_state_arrays,
    "cue_concordant_with_a": cue_concordant_with_a_arrays,
}

coords = {
    "participant": participants,
    "time": np.arange(n_timesteps),
    "state": ["xt=a", "xt=b"],
    "next_state": ["xt+1=a", "xt+1=b"],
    "emission": ["emission_concordant_with_a", "emission_concordant_with_b"],
}

with pm.Model(coords=coords) as model:

    def step(P, x_tm1):
        x_t = pm.Categorical.dist(p=P[x_tm1])
        return x_t, collect_default_updates(x_t)

    def markov_chain(x0, Ps, shape=None):
        states, _ = pytensor.scan(step, outputs_info=[x0], sequences=[Ps])

        return states

    # Set transition matricies with informative priors
    transition_matrix_baseline = pm.Dirichlet(
        "transition_matrix_baseline",
        a=[[5, 1.2], [1.2, 5]],
        dims=["participant", "state", "next_state"],
    )
    transition_matrix_explicit = pm.Dirichlet(
        "transition_matrix_explicit",
        a=[[1.2, 5], [5, 1.2]],
        dims=["participant", "state", "next_state"],
    )

    transition_matricies = pt.stack(
        [transition_matrix_baseline] * n_timesteps
    ).dimshuffle(1, 0, 2, 3)

    for participant_index, explicit_transition_indicies in enumerate(
        model_data["explicit_transition_indicies"]
    ):
        transition_matricies = transition_matricies[
            participant_index, explicit_transition_indicies, :, :
        ].set(transition_matrix_explicit[participant_index, :, :])

    transition_matricies = pm.Deterministic(
        "transition_matricies",
        transition_matricies,
        dims=["participant", "time", "state", "next_state"],
    )

    p_x0s = pm.Data(
        "initial_state",
        [0.1 if x == "a" else 0.9 for x in model_data["initial_state"]],
        dims="participant",
    )
    x0 = pm.Bernoulli("x0", p=p_x0s, dims="participant")
    hidden_states = pm.CustomDist(
        "hidden_states",
        x0,
        transition_matricies,
        dist=markov_chain,
        dims=["participant", "time"],
    )

    observed_emissions = pm.Data(
        "obsereved_emissions",
        model_data["cue_concordant_with_a"],
        dims=["participant", "time"],
    )

    emission_ps = pm.Dirichlet(
        "p_emission_concordant_with_true_state",
        a=[[3, 1], [1, 3]],
        dims=["participant", "state", "emission"],
    )
    emission_matricies = pm.math.stack([emission_ps] * n_timesteps)
    emissions = pm.Categorical(
        "emissions",
        p=emission_matricies[0, hidden_states],
        dims=["participant", "time"],
        observed=observed_emissions,
    )

    # Specify known states
    known_states = np.stack(model_data["known_states"])
    known_indices = np.where(~np.isnan(known_states))[0]
    is_correct_state = pt.eq(hidden_states[known_indices], known_states[known_indices])
    logp = pt.sum(pt.switch(is_correct_state, -0.02, -3.91))
    pm.Potential("hidden_states_observed", logp)

Are you able to point me in the right direction to get the hierarchical scan working?

The Categorical / Scan should work fine with more dimensions, just keep in mind that Scan expects the time dimension on the left, so you may need to transpose when you define sequences?

Thanks. That’s solved it!