Hazard Rates for Markov Chain Model

Hi, im building a model of sleep stage transitions.
I’ve successfully implemented a Markov chain model where the next sleep stage is dependent on the previous.
If the current_stage is either wake=0 or sleep=1 then the next stage can be categorically sampled from a transition matrix.
For example:

with pm.Model() as model:
     trans_matrix = pm.Dirichlet('trans_prob ', a=np.array([[0.3,0.7],[0.5,0.5]])],
                               shape=(2, 2),
                               testval=np.ones((2, 2))/2)
     trans_matrix_broadcast = trans_matrix [current_stage, :]
     obs = pm.Categorical('obs', p=trans_matrix_broadcast, observed=next_stage)
     trace = pm.sample()

The values in the transiton matrix represent the probability of transitioning in the next time step. This could be seen as equivelent to hazard rate in the Survival Analysis context. In the above example the values in the transition probability matrix are constant over time, and therefore the duration of each sleep stage will be distributed exponentially.
However, I want to change the transition probabilities to be dependent on time.
My sleep stage durations are distributed as a Pareto, and therefore I want the on-diagonal values in the transition probability matrix (alpha) to be alpha/time (this is the hazard rate for Pareto). Im not sure how to proceed. One attempt was:

with pm.Model() as par_model_fit:
    LowerBoundedNormal = pm.Bound(pm.Normal, lower=0)
    alpha = LowerBoundedNormal('alpha', mu=1.0, sd=1e2, shape=(len(stages_to_consider),len(stages_to_consider)))
    m=epoch_len

    def logp(current_stage, next_stage, time):
        a_rows = alpha[current_stage, :] # parameters for each transition conditional on current stage
        self_trans_a = a_rows[tt.arange(0, current_stage.shape[0]), current_stage] / time #self transitions (stay in current stage), add effect of time (Pareto hazard function)
        other_trans_a = self_trans_a[:, np.newaxis] / m - a_rows / time[:, np.newaxis] #transitions out of the current stage (inverse of on-diagonals)
        p_trans = tt.set_subtensor(other_trans_a[tt.arange(0, current_stage.shape[0]), current_stage], self_trans_a) #set the self trans elements
        cat = pm.Categorical.dist(p=p_trans) 
        return cat.logp(next_stage)

    next_obs = pm.DensityDist('survival', logp,
                              observed={'current_stage':data['current_epoch'],
                                        'time': data['time'],
                                        'next_stage':data['next_epoch']})
    trace = pm.sample()

This samples fine, but my alpha values are much higher than expected. I think this is because p is not normalized. My questions are:
A. Is there a better way to model this than a custom DensityDist distribution?
B. If not, how can I enforce elements of p between 0 and 1 and have them to sum to one?
THANKS!

I believe i’ve fixed my own issues.
I used a softmax to force the contraint that rows of p_trans sum to one.
I also realized i need better priors on alpha, so i don’t get wild aphas in the first place. I’ll post the complete solution here when I fix the kinks.

1 Like

You might find this discussion here helpful when it comes to puting softmax in your model: Multinomial hierarchical regression with multiple observations per group (“Bad energy issue”)

Awesome, i’ll cheeck it out :smile:

Just for anyone who comes across this in the future, here is the solution that worked:

        with pm.Model() as model:
            BoundedLower = pm.Bound(pm.Normal, lower=0, upper=1)
            alpha = BoundedLower('alpha', mu=priors['alpha'],
                                 sd=1e5, shape=(nstages, nstages))
            BoundedNormal = pm.Bound(pm.Normal, lower=0, upper=1)
            trans_baserate = BoundedNormal('trans_baserate', mu=priors['trans_baserate'],
                                 sd=1e5, shape=(nstages, nstages))

            def logp(current_epoch, next_epoch, tau):
                current_idx = tt.cast(current_epoch, 'int16')
                next_idx = tt.cast(next_epoch, 'int16')
                tau = tt.as_tensor_variable(tau[:, np.newaxis])
                alpha_t = alpha[current_idx, :] / tau
                trans_baserate_t = trans_baserate[current_idx, :]
                cat = pm.Categorical.dist(p=alpha_t + trans_baserate_t)
                return cat.logp(next_idx)

            next_epoch = pm.DensityDist('next_epoch', logp,
                                      observed={'current_epoch': data['current_epoch'],
                                                'tau': data['tau'],
                                                'next_epoch': data['next_epoch']})
2 Likes