Implementing the random() function for HMM distribution

Hi,
I’m using a Markov Chain (observed state) model as defined here:

class Markov1stOrder(pm.Categorical):
    """
    Hidden Markov Model States
    Adapted from https://github.com/hstrey/Hidden-Markov-Models-pymc3/blob/master/Multi-State%20HMM.ipynb

    Parameters
    ----------
    p_trans : tensor
        transition probability
        shape = (N_states,N_states)

    p_t0 : tensor
         starting probability
         shape = (N_states)

    """

    def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
        super(pm.Categorical, self).__init__(*args, **kwargs)
        self.p_trans = p_trans  # Tran probabilities (1st order trans probabilities)
        self.p_t0 = p_t0  # Initial probability for t=0 (0th order trans probabilities)
        self.mean = 0.
        self.mode = tt.cast(0, dtype='int64')

    def logp(self, x):
        """How to sample the distribution given observed points (x)"""
        p_trans = self.p_trans
        p_t0 = self.p_t0

        # We need a the probability of the next state given the current state P(x_t | x_t-1)
        # Index into trans matrix to generate categorical probabilities for the next point which is based on the
        # previous point (except the last)
        p_t = p_trans[x[:-1]]

        # x_i is what we are trying to predict
        x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
        likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
        return pm.Categorical.dist(p_t0).logp(x[0]) + tt.sum(likelihood_t)  # Static first point + dynamic next points

I’d like to test if my model can simulate data and then recover the generating parameters, i.e. i’d like to set the parameters and call pm.sample_prior_predictive(), then reinitialize the model and recover the parameters.

To sample from the prior, i need to implement the random() function right?
I was following an example here, and came up with this.

    def random(self, point=None, size=None):
        p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
        p_t0 = self.p_t0
        p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point)

        def single_dim_gen_sample(p_t0, p_trans, size):
            x = [pm.distributions.dist_math.random_choice(p=p_t0, size=size)]

            for _ in range(size-1):
                x_i = pm.distributions.dist_math.random_choice(p=p_trans[x[-1], :], size=size)
                x.append(x_i)

            return x

        return pm.distributions.multivariate.generate_samples(single_dim_gen_sample, p_t0=p_t0_, p_trans=p_trans_, size=size, broadcast_shape=(size,))


I’m sure this is not the best implementation, but i don’t see how to vectorize it as each point in a Markov chain depends on the last…

My specific issue is:
I’m not to sure how the size argument works… is the number of samples you want to draw right?
One of my parameters (p_t0, where p_t0.shape=(5,)) get sampled by draw_values to be a (5,500) matrix, the other parameter seems to stay the same size as it was at initialization (5,5). Why? Why is it not broadcast to (5,5,500)?

Additions and comments to this approach are appreciated!

@lucianopaz works the most with random generation method currently, so I will direct this to him :slight_smile:
FYI, for HMM you should check out How to marginalized Markov Chain with categorical? which is a faster implementation.

Hi @bdyetton! Your implementation looks nice. I don’t think that you can vectorize the time step axis. However there are some things that you should be aware of to continue with the random implementation:

  1. The distribution’s parameters are usually converted to tensors with as_tensor_variable. It’s not mandatory to do this but it is good practice. The thing is that sometimes p_trans and p_t0 may be other random variables, which are themselves theano tensors. That’s the reason we call pm.distributions.distribution.draw_values([p_t0, p_trans], point=point, size=size), to draw numpy array values for the parameters.
  2. draw_values does one of many things. If the parameters were numbers, numpy arrays, tensor constants or shared, their value is returned directly, ignoring the size and point arguments that are passed. If a parameter is a random variable, and a value is given to it in the point dictionary, that value is returned ignoring the size argument (this happens in sample_posterior_predictive). If the random variable’s was not given a value in the point dictionary, then its random method is called, passing point and size to it. Finally, if non of the above hold (e.g. for deterministics), then the theano graph is compiled into a function and called (kind of ignoring size).
  3. Finally, generate_samples should be made aware of the self.shape for the broadcast_shape.

I know it’s a lot of information, I should definitely write up some detailed guide on how to write custom random methods for distributions. I’ll do it when I have some spare time.

Going back to your questions

The size argument can either be an int (normal case) or a tuple shape (kind of tricky). It controls the number of independent samples you want to return of your random variable. It is not aware of the shape a single draw of the random variable has (i.e. if the RV is observed, then its shape is modified by the observed kwarg, and calling random(size=_size) would end up returning a tuple(_size) + RV.shape, shaped array). The size part of the output’s shape should always be at the beginning, so the (5, 500) likely is an error. What version of pymc3 are you using?

draw_values does not broadcast the drawn values with each other. We’ve recently added pm.distributions.distribution.broadcast_distribution_samples to help do that.

Hi @lucianopaz,
Thanks! This was very informative.
My now (working) random function is:

class Markov1stOrder(pm.Categorical):
    """
    Hidden Markov Model States
    Adapted from https://github.com/hstrey/Hidden-Markov-Models-pymc3/blob/master/Multi-State%20HMM.ipynb

    Parameters
    ----------
    p_trans : tensor
        transition probability
        shape = (N_states,N_states)

    p_t0 : tensor
         starting probability
         shape = (N_states)

    """

    def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
        super(pm.Categorical, self).__init__(*args, **kwargs)
        self.p_trans = tt.as_tensor_variable(p_trans)  # Tran probabilities (1st order trans probabilities)
        self.p_t0 = tt.as_tensor_variable(p_t0)  # Initial probability for t=0 (0th order trans probabilities)
        self.mean = 0.
        self.mode = tt.cast(0, dtype='int64')

    def logp(self, x):
        """How to sample the distribution given observed points (x)"""
        p_trans = self.p_trans
        p_t0 = self.p_t0

        # We need a the probability of the next state given the current state P(x_t | x_t-1)
        # Index into trans matrix to generate categorical probabilities for the next point which is based on the
        # previous point (except the last)
        p_t = p_trans[x[:-1]]

        # x_i is what we are trying to predict
        x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
        likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
        return pm.Categorical.dist(p_t0).logp(x[0]) + tt.sum(likelihood_t)  # Static first point + dynamic next points

    def random(self, point=None, size=None):
        p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
        p_t0 = self.p_t0
        p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point, size=size)

        def single_dim_gen_sample(p_t0, p_trans, size, hmm_len):
            x_i = pm.distributions.dist_math.random_choice(p=p_t0, size=size)  # first sample of the chain
            x = [x_i]

            for _ in range(hmm_len-1):
                x_i = pm.distributions.dist_math.random_choice(p=p_trans[x_i, :], size=size)  # all others
                x.append(x_i)

            return np.stack(x,-1)

        return pm.distributions.multivariate.generate_samples(single_dim_gen_sample,
                                                              p_t0=p_t0_, p_trans=p_trans_,
                                                              size=size,
                                                              broadcast_shape=(size,),
                                                              hmm_len=self.shape)

Which returns a (size,RV.shape) sample (only tested when shape=scalar). I’ve yet to test with RV as parameters, but i think that should be taken care of by generate_samples and draw_values if i understood there function correctly.

Sorry, when i said (5,500) before, i realy meant (500,5).

Cheers @lucianopaz, and @junpenglao (as always :wink: ) !!

1 Like