# Implementing the random() function for HMM distribution

Hi,
I’m using a Markov Chain (observed state) model as defined here:

``````class Markov1stOrder(pm.Categorical):
"""
Hidden Markov Model States

Parameters
----------
p_trans : tensor
transition probability
shape = (N_states,N_states)

p_t0 : tensor
starting probability
shape = (N_states)

"""

def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.p_trans = p_trans  # Tran probabilities (1st order trans probabilities)
self.p_t0 = p_t0  # Initial probability for t=0 (0th order trans probabilities)
self.mean = 0.
self.mode = tt.cast(0, dtype='int64')

def logp(self, x):
"""How to sample the distribution given observed points (x)"""
p_trans = self.p_trans
p_t0 = self.p_t0

# We need a the probability of the next state given the current state P(x_t | x_t-1)
# Index into trans matrix to generate categorical probabilities for the next point which is based on the
# previous point (except the last)
p_t = p_trans[x[:-1]]

# x_i is what we are trying to predict
x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
return pm.Categorical.dist(p_t0).logp(x[0]) + tt.sum(likelihood_t)  # Static first point + dynamic next points
``````

I’d like to test if my model can simulate data and then recover the generating parameters, i.e. i’d like to set the parameters and call `pm.sample_prior_predictive()`, then reinitialize the model and recover the parameters.

To sample from the prior, i need to implement the `random()` function right?
I was following an example here, and came up with this.

``````    def random(self, point=None, size=None):
p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
p_t0 = self.p_t0
p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point)

def single_dim_gen_sample(p_t0, p_trans, size):
x = [pm.distributions.dist_math.random_choice(p=p_t0, size=size)]

for _ in range(size-1):
x_i = pm.distributions.dist_math.random_choice(p=p_trans[x[-1], :], size=size)
x.append(x_i)

return x

return pm.distributions.multivariate.generate_samples(single_dim_gen_sample, p_t0=p_t0_, p_trans=p_trans_, size=size, broadcast_shape=(size,))

``````

I’m sure this is not the best implementation, but i don’t see how to vectorize it as each point in a Markov chain depends on the last…

My specific issue is:
I’m not to sure how the size argument works… is the number of samples you want to draw right?
One of my parameters (p_t0, where p_t0.shape=(5,)) get sampled by `draw_values` to be a (5,500) matrix, the other parameter seems to stay the same size as it was at initialization (5,5). Why? Why is it not broadcast to (5,5,500)?

Additions and comments to this approach are appreciated!

@lucianopaz works the most with random generation method currently, so I will direct this to him
FYI, for HMM you should check out How to marginalized Markov Chain with categorical? which is a faster implementation.

Hi @bdyetton! Your implementation looks nice. I don’t think that you can vectorize the time step axis. However there are some things that you should be aware of to continue with the `random` implementation:

1. The distribution’s parameters are usually converted to tensors with `as_tensor_variable`. It’s not mandatory to do this but it is good practice. The thing is that sometimes `p_trans` and `p_t0` may be other random variables, which are themselves theano tensors. That’s the reason we call `pm.distributions.distribution.draw_values([p_t0, p_trans], point=point, size=size)`, to draw numpy array values for the parameters.
2. `draw_values` does one of many things. If the parameters were numbers, numpy arrays, tensor constants or shared, their value is returned directly, ignoring the `size` and `point` arguments that are passed. If a parameter is a random variable, and a value is given to it in the `point` dictionary, that value is returned ignoring the `size` argument (this happens in `sample_posterior_predictive`). If the random variable’s was not given a value in the `point` dictionary, then its `random` method is called, passing `point` and `size` to it. Finally, if non of the above hold (e.g. for deterministics), then the theano graph is compiled into a function and called (kind of ignoring `size`).
3. Finally, `generate_samples` should be made aware of the `self.shape` for the `broadcast_shape`.

I know it’s a lot of information, I should definitely write up some detailed guide on how to write custom `random` methods for distributions. I’ll do it when I have some spare time.

Going back to your questions

The `size` argument can either be an `int` (normal case) or a tuple shape (kind of tricky). It controls the number of independent samples you want to return of your random variable. It is not aware of the `shape` a single draw of the random variable has (i.e. if the RV is observed, then its `shape` is modified by the observed kwarg, and calling `random(size=_size)` would end up returning a `tuple(_size) + RV.shape`, shaped array). The `size` part of the output’s shape should always be at the beginning, so the `(5, 500)` likely is an error. What version of pymc3 are you using?

`draw_values` does not broadcast the drawn values with each other. We’ve recently added `pm.distributions.distribution.broadcast_distribution_samples` to help do that.

Hi @lucianopaz,
Thanks! This was very informative.
My now (working) random function is:

``````class Markov1stOrder(pm.Categorical):
"""
Hidden Markov Model States

Parameters
----------
p_trans : tensor
transition probability
shape = (N_states,N_states)

p_t0 : tensor
starting probability
shape = (N_states)

"""

def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.p_trans = tt.as_tensor_variable(p_trans)  # Tran probabilities (1st order trans probabilities)
self.p_t0 = tt.as_tensor_variable(p_t0)  # Initial probability for t=0 (0th order trans probabilities)
self.mean = 0.
self.mode = tt.cast(0, dtype='int64')

def logp(self, x):
"""How to sample the distribution given observed points (x)"""
p_trans = self.p_trans
p_t0 = self.p_t0

# We need a the probability of the next state given the current state P(x_t | x_t-1)
# Index into trans matrix to generate categorical probabilities for the next point which is based on the
# previous point (except the last)
p_t = p_trans[x[:-1]]

# x_i is what we are trying to predict
x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
return pm.Categorical.dist(p_t0).logp(x[0]) + tt.sum(likelihood_t)  # Static first point + dynamic next points

def random(self, point=None, size=None):
p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
p_t0 = self.p_t0
p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point, size=size)

def single_dim_gen_sample(p_t0, p_trans, size, hmm_len):
x_i = pm.distributions.dist_math.random_choice(p=p_t0, size=size)  # first sample of the chain
x = [x_i]

for _ in range(hmm_len-1):
x_i = pm.distributions.dist_math.random_choice(p=p_trans[x_i, :], size=size)  # all others
x.append(x_i)

return np.stack(x,-1)

return pm.distributions.multivariate.generate_samples(single_dim_gen_sample,
p_t0=p_t0_, p_trans=p_trans_,
size=size,
Which returns a (size,RV.shape) sample (only tested when shape=scalar). I’ve yet to test with RV as parameters, but i think that should be taken care of by `generate_samples` and `draw_values` if i understood there function correctly.