Ricardo is right, what you have here is an HMM. Just as an object lesson though, here’s how I converted your proposed data generating process to a scan model. The final goal is to draw the random variables inside the loop rather that all at once. This will put you in the right mindset to write a scan inner function. I also wanted to vectorize away the inner loop. My strategy is:
- Tile everything to
dur.max()
- Make a boolian max to set values less than
dur.max() back to zero
Here’s where I landed:
import numpy as np
import matplotlib.pyplot as plt
seed = 12345
rng = np.random.default_rng(seed)
FACTORIES = ["FactoryA", 'FactoryB']
n_factories = len(FACTORIES)
N = 365 * 4
outage_prob = 0.02
restoration_prob = 0.1
outage_mag_alpha = 1
outage_mag_beta = 0.2
outages = np.zeros((N, n_factories))
for i in range(N):
has_outage = rng.binomial(n=1, p=outage_prob, size=n_factories)
outage_magnitude = rng.gamma(shape=outage_mag_alpha, scale=outage_mag_beta, size=n_factories)
outage_duration = rng.geometric(p=restoration_prob, size=n_factories)
max_dur = outage_duration.max()
new_outage = np.tile(outage_magnitude * has_outage, max_dur).reshape(-1, n_factories)
outage_mask = np.arange(max_dur).repeat(n_factories).reshape(-1, n_factories) < dur
next_slice = slice(i, min(i + max_dur, N))
slice_len = next_slice.stop - next_slice.start
outages[next_slice] = outages[next_slice] + (new_outage * outage_mask)[:slice_len]
plt.plot(outages)

As you can see, the process is the same as yours. The difference is we can now map this one-to-one into a pytensor scan. The inner function will just be the inner loop, with np changed to pt, plus some boiler plate:
from pymc.pytensorf import collect_default_updates
import pytensor
def step(t, outages, outage_prob, restoration_prob, outage_mag_alpha, outage_mag_beta, n_factories):
has_outage = pm.Binomial.dist(n=1, p=outage_prob, shape=n_factories)
outage_magnitude = pm.Gamma.dist(alpha=outage_mag_alpha, beta=outage_mag_beta, shape=n_factories)
outage_duration = pm.Geometric.dist(p=restoration_prob, shape=n_factories)
max_dur = outage_duration.max()
new_shape = (-1, n_factories)
new_outage = pt.tile(outage_magnitude * has_outage, max_dur).reshape(new_shape)
outage_mask = pt.lt(pt.arange(max_dur).repeat(n_factories).reshape(new_shape), dur)
slice_max = pt.minimum(t + max_dur, outages.shape[0])
next_slice = slice(t, slice_max)
slice_len = next_slice.stop - next_slice.start
outages = pt.set_subtensor(outages[next_slice],
outages[next_slice] + (new_outage * outage_mask)[:slice_len])
return outages, collect_default_updates(outages)
with pm.Model(coords={'factory':FACTORIES, 'time':np.arange(N)}) as m:
outages = pt.zeros((N, n_factories))
outages, updates = pytensor.scan(step,
non_sequences=[outage_prob, restoration_prob, outage_mag_alpha, outage_mag_beta, n_factories],
outputs_info=[outages],
sequences=[pt.arange(N, dtype='int32')])
outages = pm.Deterministic('outages', outages[-1], dims=['time', 'factory']
assert updates
pm_outages = pm.draw(outages)
plt.plot(pm_outages)

As promised, we get the same answer. Now you won’t actually be able to estimate this model, because slicing and assignment Ops are not measurable – PyMC doesn’t know how to derive the log probability from generative models that use these.
(For all you ever wanted to know about what kinds of generative models PyMC can turn into logp, check out @ricardoV94 's excellent puzzle notebook)
So what you can do instead is think about two states: operating and outage. If your code, p(\text{operating} | \text{outage}) = 0.1 (what I called the “restoration probability” and p(\text{outage} | \text{operating}) = 0.02. So the whole state transition matrix will be:
P = \begin{bmatrix} 0.98 & 0.02 \\ 0.1 & 0.9 \end{bmatrix}
Where state 1 is “operating” and state 2 is “outage”. In either case the function is going to be autoregressive in the hidden states. Calling the state z_t, if z_{t-1} = 0 (operating) and z_t = 1 (outage), we get
\mu_t = \zeta_t
Where \zeta_t \sim \text{Gamma}(\cdot) is the outage magnitude. If z_{t-1} = 1 and z_t = 0, then \mu = 0 (the outage is repaired). Otherwise, we have \mu_t = \mu_{t-1}. To put this all together, define \Delta z_t = z_t - z_{t-1}, then:
\mu_t = (1 - [\Delta z_t < 0]) \cdot ([\Delta z_t = 0] \mu_{t-1} + [\Delta z_t > 0] \zeta_t)
PyMC model using pmx.DiscreteMarkovChain:
from pymc_experimental.distributions import DiscreteMarkovChain
coords = {
'state':['operating', 'outage'],
'time':np.arange(N),
'factory':FACTORIES
}
with pm.Model(coords=coords) as m:
P = pt.as_tensor_variable(np.array([[1 - outage_prob, outage_prob],
[restoration_prob, 1 - restoration_prob]]))
hidden_states = DiscreteMarkovChain('state',
P=P,
dims=['factory', 'time'],
init_dist=pm.DiracDelta.dist([0, 0]))
def step(state, state_tm1, mu_tm1, outage_mag_alpha, outage_mag_beta):
Dz = state - state_tm1
repaired = pt.lt(Dz, 0)
outage = pt.gt(Dz, 0)
cont = pt.eq(Dz, 0)
outage_magnitude = pm.Gamma.dist(alpha=outage_mag_alpha, beta=outage_mag_beta, shape=state.shape[0])
mu = (1 - repaired) * (cont * mu_tm1 + outage * outage_magnitude)
return mu, collect_default_updates(mu)
outage, updates = pytensor.scan(step,
sequences=[{'input':hidden_states.T, 'taps':[0, -1]}],
outputs_info=[pt.zeros(n_factories)],
non_sequences=[outage_mag_alpha, outage_mag_beta],
strict=True)
plt.plot(pm.draw(outage))

As you can see, the output is almost the same. The biggest difference is that this model can never have a “double emission”, where you see an additional outage jump during an outage. I didn’t know if this was a feature or a bug – if it’s a feature, you can just add another random variable inside the inner function for a “outage enhancement”, and add it into the computation of mu.
The advantage of doing things this way is that now the inner function is composed only of functions that are measurable, so you should be able to wrap this into a CustomDist and do inference on it.