 # Implementing the random() function for HMM distribution

Hi,
I’m using a Markov Chain (observed state) model as defined here:

``````class Markov1stOrder(pm.Categorical):
"""
Hidden Markov Model States

Parameters
----------
p_trans : tensor
transition probability
shape = (N_states,N_states)

p_t0 : tensor
starting probability
shape = (N_states)

"""

def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.p_trans = p_trans  # Tran probabilities (1st order trans probabilities)
self.p_t0 = p_t0  # Initial probability for t=0 (0th order trans probabilities)
self.mean = 0.
self.mode = tt.cast(0, dtype='int64')

def logp(self, x):
"""How to sample the distribution given observed points (x)"""
p_trans = self.p_trans
p_t0 = self.p_t0

# We need a the probability of the next state given the current state P(x_t | x_t-1)
# Index into trans matrix to generate categorical probabilities for the next point which is based on the
# previous point (except the last)
p_t = p_trans[x[:-1]]

# x_i is what we are trying to predict
x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
return pm.Categorical.dist(p_t0).logp(x) + tt.sum(likelihood_t)  # Static first point + dynamic next points
``````

I’d like to test if my model can simulate data and then recover the generating parameters, i.e. i’d like to set the parameters and call `pm.sample_prior_predictive()`, then reinitialize the model and recover the parameters.

To sample from the prior, i need to implement the `random()` function right?
I was following an example here, and came up with this.

``````    def random(self, point=None, size=None):
p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
p_t0 = self.p_t0
p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point)

def single_dim_gen_sample(p_t0, p_trans, size):
x = [pm.distributions.dist_math.random_choice(p=p_t0, size=size)]

for _ in range(size-1):
x_i = pm.distributions.dist_math.random_choice(p=p_trans[x[-1], :], size=size)
x.append(x_i)

return x

return pm.distributions.multivariate.generate_samples(single_dim_gen_sample, p_t0=p_t0_, p_trans=p_trans_, size=size, broadcast_shape=(size,))

``````

I’m sure this is not the best implementation, but i don’t see how to vectorize it as each point in a Markov chain depends on the last…

My specific issue is:
I’m not to sure how the size argument works… is the number of samples you want to draw right?
One of my parameters (p_t0, where p_t0.shape=(5,)) get sampled by `draw_values` to be a (5,500) matrix, the other parameter seems to stay the same size as it was at initialization (5,5). Why? Why is it not broadcast to (5,5,500)?

@lucianopaz works the most with random generation method currently, so I will direct this to him FYI, for HMM you should check out How to marginalized Markov Chain with categorical? which is a faster implementation.

Hi @bdyetton! Your implementation looks nice. I don’t think that you can vectorize the time step axis. However there are some things that you should be aware of to continue with the `random` implementation:

1. The distribution’s parameters are usually converted to tensors with `as_tensor_variable`. It’s not mandatory to do this but it is good practice. The thing is that sometimes `p_trans` and `p_t0` may be other random variables, which are themselves theano tensors. That’s the reason we call `pm.distributions.distribution.draw_values([p_t0, p_trans], point=point, size=size)`, to draw numpy array values for the parameters.
2. `draw_values` does one of many things. If the parameters were numbers, numpy arrays, tensor constants or shared, their value is returned directly, ignoring the `size` and `point` arguments that are passed. If a parameter is a random variable, and a value is given to it in the `point` dictionary, that value is returned ignoring the `size` argument (this happens in `sample_posterior_predictive`). If the random variable’s was not given a value in the `point` dictionary, then its `random` method is called, passing `point` and `size` to it. Finally, if non of the above hold (e.g. for deterministics), then the theano graph is compiled into a function and called (kind of ignoring `size`).
3. Finally, `generate_samples` should be made aware of the `self.shape` for the `broadcast_shape`.

I know it’s a lot of information, I should definitely write up some detailed guide on how to write custom `random` methods for distributions. I’ll do it when I have some spare time.

The `size` argument can either be an `int` (normal case) or a tuple shape (kind of tricky). It controls the number of independent samples you want to return of your random variable. It is not aware of the `shape` a single draw of the random variable has (i.e. if the RV is observed, then its `shape` is modified by the observed kwarg, and calling `random(size=_size)` would end up returning a `tuple(_size) + RV.shape`, shaped array). The `size` part of the output’s shape should always be at the beginning, so the `(5, 500)` likely is an error. What version of pymc3 are you using?

`draw_values` does not broadcast the drawn values with each other. We’ve recently added `pm.distributions.distribution.broadcast_distribution_samples` to help do that.

Hi @lucianopaz,
Thanks! This was very informative.
My now (working) random function is:

``````class Markov1stOrder(pm.Categorical):
"""
Hidden Markov Model States

Parameters
----------
p_trans : tensor
transition probability
shape = (N_states,N_states)

p_t0 : tensor
starting probability
shape = (N_states)

"""

def __init__(self, p_t0=None, p_trans=None, *args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.p_trans = tt.as_tensor_variable(p_trans)  # Tran probabilities (1st order trans probabilities)
self.p_t0 = tt.as_tensor_variable(p_t0)  # Initial probability for t=0 (0th order trans probabilities)
self.mean = 0.
self.mode = tt.cast(0, dtype='int64')

def logp(self, x):
"""How to sample the distribution given observed points (x)"""
p_trans = self.p_trans
p_t0 = self.p_t0

# We need a the probability of the next state given the current state P(x_t | x_t-1)
# Index into trans matrix to generate categorical probabilities for the next point which is based on the
# previous point (except the last)
p_t = p_trans[x[:-1]]

# x_i is what we are trying to predict
x_i = x[1:]  # The first point is not transitioned too, and not sampled from trans distribution
likelihood_t = pm.Categorical.dist(p_t).logp(x_i)  # Likelihood of transitioned to points (dynamic process)
return pm.Categorical.dist(p_t0).logp(x) + tt.sum(likelihood_t)  # Static first point + dynamic next points

def random(self, point=None, size=None):
p_trans = self.p_trans  # Tran probabilities (1st order trans probabilities)
p_t0 = self.p_t0
p_t0_, p_trans_ = pm.distributions.multivariate.draw_values([p_t0, p_trans], point=point, size=size)

def single_dim_gen_sample(p_t0, p_trans, size, hmm_len):
x_i = pm.distributions.dist_math.random_choice(p=p_t0, size=size)  # first sample of the chain
x = [x_i]

for _ in range(hmm_len-1):
x_i = pm.distributions.dist_math.random_choice(p=p_trans[x_i, :], size=size)  # all others
x.append(x_i)

return np.stack(x,-1)

return pm.distributions.multivariate.generate_samples(single_dim_gen_sample,
p_t0=p_t0_, p_trans=p_trans_,
size=size,
Which returns a (size,RV.shape) sample (only tested when shape=scalar). I’ve yet to test with RV as parameters, but i think that should be taken care of by `generate_samples` and `draw_values` if i understood there function correctly.
Cheers @lucianopaz, and @junpenglao (as always ) !!