Hi,
I’m trying to estimate the parameters of the Conditional Probability Distributions (CPD) in of Hidden Markov Model from multiple short sequences (using Pymc3).
The CPD for the transition probabilities of the hidden states is a probability table.
The CPD for the emissions is a Beta distribution, where the parameter values depend on the states of the hidden variable.
I followed two previous posts:
(1) https://ericmjl.github.io/essays-on-data-science/machine-learning/markov-models/
(2) Sequence of Observed in a loop, how the log-likelihood are estimated? add them all?
By adapting the code of (1), I can successfully estimate all parameters when using a single sequence (eg n=100). If increasing the size of the sequence (eg. from 100 to 300), the estimates become obviously more accurate and precise.
However, I can’t reproduce the results when using more than 1 sequence (here d=3).
Subsequently, I tried the suggestion of (2) and I wasn’t successful either.
Any ideas what I’m doing wrong?
Reproducible code:
# import libraries
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multinomial
from typing import List
import scipy.stats as stats
import pymc3 as pm
import theano.tensor as tt
import theano.tensor.slinalg as sla
import theano
import warnings
import arviz as az
warnings.simplefilter(action="ignore", category=FutureWarning)
theano.config.gcc.cxxflags = "-Wno-c++11-narrowing"
# analytic solution to obtain equilibrium distribution
def equilibrium_distribution(p_transition):
n_states = p_transition.shape[0]
A = np.append(
arr=p_transition.T - np.eye(n_states),
values=np.ones(n_states).reshape(1, -1),
axis=0
)
b = np.transpose(np.array([0] * n_states + [1]))
p_eq = np.linalg.solve(
a=np.transpose(A).dot(A),
b=np.transpose(A).dot(b)
)
return p_eq
# create/sample states
def markov_sequence(p_init: np.array, p_transition: np.array, sequence_length: int) -> List[int]:
"""
Generate a Markov sequence based on p_init and p_transition.
"""
if p_init is None:
p_init = equilibrium_distribution(p_transition)
initial_state = list(multinomial.rvs(1, p_init)).index(1)
states = [initial_state]
for _ in range(sequence_length - 1):
p_tr = p_transition[states[-1]]
new_state = list(multinomial.rvs(1, p_tr)).index(1)
states.append(new_state)
return states
def beta_emissions(states: List[int], As: List[float], Bs: List[float]) -> List[float]:
emissions = []
for state in states:
a = As[state]
b = Bs[state]
e = stats.beta.rvs(a, b, size=1)
emissions.append(e)
return emissions
def generate(t,d, p_transition, a, b):
p_init = np.array([1,0])
# transition probability matrix
# each row must sum to one
assert p_transition[0, :].sum() == 1
assert p_transition[1, :].sum() == 1
#Generating a Markov Sequence
states_np = np.full((t, d), np.nan)
for i in range(d):
states = markov_sequence(p_init, p_transition, sequence_length=t)
states_np[:,i] = states
# Generate Emissions: states and observables
beta_ems_np = np.full((t,d), np.nan)
for i in range(d):
beta_ems = beta_emissions(states_np[:,i].astype(int), As = a, Bs = b)
beta_ems_np[:,i] = beta_ems
x=np.arange(0,t)
plt.plot(x,beta_ems_np)
plt.show()
return states_np, beta_ems_np
class HMMStates(pm.Categorical):
def __init__(self, p_transition, p_equilibrium, n_states, *args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.p_transition = p_transition
self.p_equilibrium = p_equilibrium
self.k = n_states
self.mode = tt.cast(0,dtype='int64')
def logp(self, x):
p_eq = self.p_equilibrium
# Broadcast out the transition probabilities,
# so that we can broadcast the calculation
# of log-likelihoods
p_tr = self.p_transition[x[:-1]]
# the logp of the initial state evaluated against the equilibrium probabilities
initial_state_logp = pm.Categorical.dist(p_eq).logp(x[0])
# the logp of the rest of the states.
x_i = x[1:]
ou_like = pm.Categorical.dist(p_tr).logp(x_i)
transition_logp = tt.sum(ou_like)
return initial_state_logp + transition_logp
def solve_equilibrium(n_states, p_transition):
A = tt.dmatrix('A')
A = tt.eye(n_states) - p_transition + tt.ones(shape=(n_states, n_states))
p_equilibrium = pm.Deterministic("p_equilibrium", sla.solve(A.T, tt.ones(shape=(n_states))))
return p_equilibrium
class HMMBetaEmissions(pm.Continuous):
def __init__(self, states, a, b, *args, **kwargs):
super().__init__(*args, **kwargs)
self.states = states
self.a = a
self.b = b
def logp(self, x):
"""
x: observations
"""
states = self.states
a = self.a[states] #np.array([1,10]).astype(int)[states]
b = self.b[states] #np.array([2,1]).astype(int)[states]
return tt.sum(pm.Beta.dist(alpha=a, beta=b).logp(x))
if __name__ == '__main__':
# ground truth parameters of the transition CPD
p_transition_sim = np.array(
[[0.9, 0.1],
[0.01, 0.99]]
)
# ground truth parameters of the emission CPD (beta dist)
a_sim = [1,10]
b_sim = [2,1]
model from (1) - single sequence
n = 100 # lenght of sequence
d = 1 # number of sequences
# Generate data
states, ems = generate(n, d, p_transition_sim, a_sim, b_sim)
n_states = 2
model1 = pm.Model()
with model1:
# Priors for transition matrix
p_transition = pm.Dirichlet("p_transition", a=tt.ones((n_states, n_states)), shape=(n_states, n_states))
# Solve for the equilibrium state
p_equilibrium = solve_equilibrium(n_states, p_transition)
# HMM state
hmm_states = HMMStates(
"hmm_states",
p_transition=p_transition,
p_equilibrium=p_equilibrium,
n_states=n_states,
shape=(ems.shape[0],ems.shape[1]),
)
# Prior for a and b
a=pm.Exponential("a", lam=2, shape=(n_states,))
b=pm.Exponential("b", lam=2, shape=(n_states,))
# Observed emission likelihood
obs = HMMBetaEmissions(
"emission",
states=hmm_states,
a=a,
b=b,
observed = ems
)
with model1:
trace = pm.sample(2000, chains=2)
az.plot_trace(trace, var_names=["a"]);
az.plot_trace(trace, var_names=["b"]);
az.plot_trace(trace, var_names=["p_transition"]);
Results: The parameters could be estimated. It’s not perfect, but when increasing the length of the sequence, I would get better results.
model from (1) multiple sequences
n = 100 # lenght of sequence
d = 3 # number of sequences
# generate data
states, ems = generate(n,d, p_transition_sim, a_sim, b_sim)
n_states = 2
model1 = pm.Model()
with model1:
# Priors for transition matrix
p_transition = pm.Dirichlet("p_transition", a=tt.ones((n_states, n_states)), shape=(n_states, n_states))
# Solve for the equilibrium state
p_equilibrium = solve_equilibrium(n_states, p_transition)
# HMM state
hmm_states = HMMStates(
"hmm_states",
p_transition=p_transition,
p_equilibrium=p_equilibrium,
n_states=n_states,
shape=(ems.shape[0],ems.shape[1]),
)
# Prior for mu and sigma
a=pm.Exponential("a", lam=2, shape=(n_states,))
b=pm.Exponential("b", lam=2, shape=(n_states,))
# Observed emission likelihood
obs = HMMBetaEmissions(
"emission",
states=hmm_states,
a=a,
b=b,
observed = ems
)
with model1:
trace = pm.sample(2000, chains=2)
az.plot_trace(trace, var_names=["a"]);
az.plot_trace(trace, var_names=["b"]);
az.plot_trace(trace, var_names=["p_transition"]);
Results: Although I have more observations available than in example 1, I obtained poorer parameter estimations. Even if I increase the number of sequences to d=1000, Im not able to estimate the parameters.
model from (2) single sequence
n = 100 # lenght of sequence
d = 1 # number of sequences
# generate data
states, ems = generate(n,d, p_transition_sim, a_sim, b_sim)
model2 = pm.Model()
with model2:
# Priors for transition matrix
p_transition = pm.Dirichlet("p_transition", a=tt.ones((n_states, n_states)), shape=(n_states, n_states))
# Priors for a and b
a=pm.Exponential("a", lam=2, shape=(n_states,))
b=pm.Exponential("b", lam=2, shape=(n_states,))
# Solve for the equilibrium state
p_equilibrium = solve_equilibrium(n_states, p_transition)
hmm_states_list = []
emissions_list = []
for i in range(d):
# HMM state
hmm_states_i = HMMStates(
"hmm_states_" + str(i),
p_transition=p_transition,
p_equilibrium=p_equilibrium,
n_states=n_states,
shape=(ems.shape[0],1),
)
# Observed emission likelihood
obs_i = HMMBetaEmissions(
"emission_" + str(i),
states=hmm_states_i,
a = a,#np.array([1,10]).astype(int),
b = b,#np.array([2,1]).astype(int),
observed = ems[:,i]
)
hmm_states_list += [hmm_states_i]
emissions_list += [obs_i]
with model2:
trace = pm.sample(2000, chains=2)
az.plot_trace(trace, var_names=["a"]);
az.plot_trace(trace, var_names=["b"]);
az.plot_trace(trace, var_names=["p_transition"]);
Results: The code adaption of (2) doesn’t seem to work.
model from (2) multiple sequences
n = 100 # lenght of sequence
d = 3 # number of sequences
# generate data
states, ems = generate(n,d, p_transition_sim, a_sim, b_sim)
model2 = pm.Model()
with model2:
# Priors for transition matrix
p_transition = pm.Dirichlet("p_transition", a=tt.ones((n_states, n_states)), shape=(n_states, n_states))
# Priors for a and b
a=pm.Exponential("a", lam=2, shape=(n_states,))
b=pm.Exponential("b", lam=2, shape=(n_states,))
# Solve for the equilibrium state
p_equilibrium = solve_equilibrium(n_states, p_transition)
hmm_states_list = []
emissions_list = []
for i in range(d):
# HMM state
hmm_states_i = HMMStates(
"hmm_states_" + str(i),
p_transition=p_transition,
p_equilibrium=p_equilibrium,
n_states=n_states,
shape=(ems.shape[0],1),
)
# Observed emission likelihood
obs_i = HMMBetaEmissions(
"emission_" + str(i),
states=hmm_states_i,
a = a,#np.array([1,10]).astype(int),
b = b,#np.array([2,1]).astype(int),
observed = ems[:,i]
)
hmm_states_list += [hmm_states_i]
emissions_list += [obs_i]
with model2:
trace = pm.sample(2000, chains=2)
az.plot_trace(trace, var_names=["a"]);
az.plot_trace(trace, var_names=["b"]);
az.plot_trace(trace, var_names=["p_transition"]);