I believe that using my own distribution for later adjustments depending on my needs. Currently, I have the following HMM states class:
class HMMRV(RandomVariable):
name: str = "categorical"
ndim_supp: int = 1
ndims_params: List[int] = [1,2,0]
dtype: str = "int64"
_print_name: Tuple[str, str] = ("hmm", "\\operatorname{hmm}")
@classmethod
def _supp_shape_from_params(cls, dist_params, param_shapes) -> Tuple[int, ...]:
return (dist_params[2],)
def __call__(self, p_init=None, p_mat=None, len_traj=None, n_states=None, size=None, **kwargs) -> TensorVariable:
return super().__call__(p_init, p_mat, len_traj, n_states, size=size, **kwargs)
@classmethod
def rng_fn(
cls,
rng: np.random.RandomState,
p_init: np.ndarray,
p_mat: np.matrix,
len_traj: int,
n_states: int,
size: Tuple[int, ...],
) -> np.ndarray:
samples = np.zeros(size + (len_traj,), dtype=np.int64)
for i in np.ndindex(size):
current_state = rng.choice(np.arange(len(p_init)), p=p_init)
samples[i + (0,)] = current_state # Store initial state
# Generate the trajectory using the transition matrix
for j in range(1, len_traj):
next_state = rng.choice(np.arange(len(p_init)), p=p_mat[current_state])
samples[i + (j,)] = next_state
current_state = next_state
return samples
hmm = HMMRV()
class HMM(pm.Discrete):
rv_op = hmm
@classmethod
def dist(cls, p_init, p_trans_mat, traj_length, n_states, *args, **kwargs):
return super().dist([p_init, p_trans_mat, traj_length, n_states], **kwargs)
def moment(rv, size, p_init, p_trans_mat, traj_length, n_states): # TO BE IMPROVED !!!
p_init_ = p_init.eval()
p_trans_mat_ = p_trans_mat.eval()
traj_length_ = traj_length.eval()
size_ = size.eval()
mode = np.zeros(shape=[size_[0], traj_length_]) # Assuming size is the size of the state space
current_state = np.argmax(p_init_, axis=-1) # Get the initial state with maximum probability
for i in range(1, traj_length_):
next_state = np.argmax(p_trans_mat_[current_state], axis=-1)
mode[:, i] = next_state
current_state = next_state
mode = pt.as_tensor(mode)
return mode
def logp(value, p_init, p_trans_mat, traj_length, n_states):
k = n_states
value_clip = pt.clip(value, 0, k - 1)
# print(value_clip.eval())
if value_clip.ndim == 1:
value_clip = value_clip[None, :]
value_init = value_clip[:, 0]
value_trans1 = value_clip[:, 0:(traj_length-1)]
value_trans2 = value_clip[:, 1:(traj_length)]
logp_init = pm.logp(pm.Categorical.dist(p=p_init), value_init)
logp_trans = pm.logp(pm.Categorical.dist(p=p_trans_mat[value_trans1]), value_trans2)[0]
logp_trans = pt.sum(logp_trans, axis=1)
total_log = logp_init +logp_trans
res = pt.switch(
pt.any(pt.or_(pt.lt(value, 0), pt.gt(value, k - 1))),
-np.inf,
total_log,
)
return res
And a dummy class for Emissions (which simply is a gaussian emission, that returns logp=-inf for index not in range:
class EmissionsRV(RandomVariable):
name: str = "Emissions"
ndim_supp: int = 0
ndims_params: List[int] = [1,1,0]
dtype: str = "floatX"
_print_name: Tuple[str, str] = ("emissions", "\\operatorname{emissions}")
def __call__(self, mu=None, sigma=None, index=None, n_states = None, size=None, **kwargs) -> TensorVariable:
return super().__call__(mu, sigma, index, n_states, size=size, **kwargs)
@classmethod
def rng_fn(
cls,
rng: np.random.RandomState,
mu: np.ndarray,
sigma: np.matrix,
index: int,
n_states: int,
size: Tuple[int, ...],
) -> np.ndarray:
samples = np.zeros(size, dtype=np.float64)
for i in np.ndindex(size):
samples[i] = rng.normal(loc=mu[index], scale=sigma[index])
return samples
emissions = EmissionsRV()
class Emissions(pm.Continuous):
rv_op = emissions
@classmethod
def dist(cls, mu, sigma, index, n_states, *args, **kwargs):
return super().dist([mu, sigma, index, n_states], **kwargs)
def moment(rv, size, mu, sigma, index, n_states): # TO BE CHANGED !!!
mode = mu[index]
return mode
def logp(value, mu, sigma, index, n_states):
k = n_states
index_clipped = pt.clip(index, 0, k - 1)
res = pt.switch(
pt.any(pt.or_(pt.lt(index, 0), pt.gt(index, k - 1))),
-np.inf,
pm.logp(pm.Normal.dist(mu=mu[index_clipped], sigma=sigma[index_clipped]), value),
)
return res
Now, even with simplistic generated data, the inference is not good: it won’t properly understand the states transition and will increase the variance of the emissions to compensate for the wrong states.
Regarding the support on new features, I doubt I have the required skills and lack time at the moment. Therefore, I’m afraid I won’t be able to help at the moment.