Outage Modeling with Scan

I’m trying to model some factory outage data that looks like the following.

The idea initially is to model each of these factories independently. I was thinking of doing this by modeling the magnitude of an outages as a gamma and whether the plant is on outage as Bernoulli, and then the duration as geometric. This would look something like

import pymc as pm
import numpy as np

FACTORIES = ["FactoryA", "FactoryB"]
N = 365 * 4
DATES = np.arange(N)

with pm.Model() as outage_model:
    outage_model.add_coord("factory_name", FACTORIES, mutable=False)
    outage_model.add_coord("date", DATES)

    outage_magnitude = pm.Gamma("outage_magnitude", alpha=1, beta=1, dims=["date", "factory_name"])
    outage_start = pm.Bernouli("outage_start", p=0.1, dims=["date", "factory_name"])
    outage_duration = pm.Geometric("outage_duration", p=0.1, dims=["date", "factory_name"])

    outage = pm.Deterministic(
        "outage",
        some_function(outage_magnitude * outage_start, outage_duration),
        dims=["date", "factory_name"],
        observed=outage_data,
    )
  

But I’m struggling to understand how to write some_func. I think that can be done using scan but exactly how eludes me. If I were to think generatively about how to write this in pure python, I would use something like

import numpy as np
import matplotlib.pyplot as plt

FACTORIES = ["FactoryA"]
N = 365 * 4

# 0 inflated magnitude
outage_magnitude = np.random.gamma(
    shape=1, scale=0.2, size=(N, len(FACTORIES))
) * np.random.binomial(n=1, p=0.02, size=(N, len(FACTORIES)))
outage_duration = np.random.geometric(p=0.1, size=(N, len(FACTORIES)))

outages = np.zeros((N + outage_duration.max(), len(FACTORIES)))
for i in range(N):
    for j in range(len(FACTORIES)):
        dur = outage_duration[i, j]
        new_outage = np.tile(outage_magnitude[i, j], dur)
        outages[i : i + dur, j] = outages[i : i + dur, j] + new_outage
        
outages =  outages[:N]

plt.plot(outages)

Something like a Hidden Markov Model may be better so you can marginalize the state variables, and recover them after you infer the continuous parameters

Ricardo is right, what you have here is an HMM. Just as an object lesson though, here’s how I converted your proposed data generating process to a scan model. The final goal is to draw the random variables inside the loop rather that all at once. This will put you in the right mindset to write a scan inner function. I also wanted to vectorize away the inner loop. My strategy is:

  1. Tile everything to dur.max()
  2. Make a boolian max to set values less than dur.max() back to zero

Here’s where I landed:

import numpy as np
import matplotlib.pyplot as plt

seed = 12345
rng = np.random.default_rng(seed)

FACTORIES = ["FactoryA", 'FactoryB']
n_factories = len(FACTORIES)
N = 365 * 4


outage_prob = 0.02
restoration_prob = 0.1
outage_mag_alpha = 1
outage_mag_beta = 0.2

outages = np.zeros((N, n_factories))

for i in range(N):
    has_outage = rng.binomial(n=1, p=outage_prob, size=n_factories)
    outage_magnitude = rng.gamma(shape=outage_mag_alpha, scale=outage_mag_beta, size=n_factories)
    outage_duration = rng.geometric(p=restoration_prob, size=n_factories)
    
    max_dur = outage_duration.max()
    
    new_outage = np.tile(outage_magnitude * has_outage, max_dur).reshape(-1, n_factories)
    outage_mask = np.arange(max_dur).repeat(n_factories).reshape(-1, n_factories) < dur
    
    next_slice = slice(i, min(i + max_dur, N))
    slice_len = next_slice.stop - next_slice.start
    outages[next_slice] = outages[next_slice] + (new_outage * outage_mask)[:slice_len]
        
plt.plot(outages)

image

As you can see, the process is the same as yours. The difference is we can now map this one-to-one into a pytensor scan. The inner function will just be the inner loop, with np changed to pt, plus some boiler plate:

from pymc.pytensorf import collect_default_updates
import pytensor 

def step(t, outages, outage_prob, restoration_prob, outage_mag_alpha, outage_mag_beta, n_factories):
    has_outage = pm.Binomial.dist(n=1, p=outage_prob, shape=n_factories)
    outage_magnitude = pm.Gamma.dist(alpha=outage_mag_alpha, beta=outage_mag_beta, shape=n_factories)
    outage_duration = pm.Geometric.dist(p=restoration_prob, shape=n_factories)
    
    max_dur = outage_duration.max()
    new_shape = (-1, n_factories)
    
    new_outage = pt.tile(outage_magnitude * has_outage, max_dur).reshape(new_shape)
    outage_mask = pt.lt(pt.arange(max_dur).repeat(n_factories).reshape(new_shape), dur)
    
    slice_max = pt.minimum(t + max_dur, outages.shape[0])
    next_slice = slice(t, slice_max)
    slice_len = next_slice.stop - next_slice.start
    outages = pt.set_subtensor(outages[next_slice], 
                               outages[next_slice] + (new_outage * outage_mask)[:slice_len])
    
    return outages, collect_default_updates(outages)

with pm.Model(coords={'factory':FACTORIES, 'time':np.arange(N)}) as m:
    outages = pt.zeros((N, n_factories))
    outages, updates = pytensor.scan(step, 
                                     non_sequences=[outage_prob, restoration_prob, outage_mag_alpha, outage_mag_beta, n_factories],
                                     outputs_info=[outages],
                                     sequences=[pt.arange(N, dtype='int32')])
    outages = pm.Deterministic('outages', outages[-1], dims=['time', 'factory']
    assert updates

pm_outages = pm.draw(outages)
plt.plot(pm_outages)

image

As promised, we get the same answer. Now you won’t actually be able to estimate this model, because slicing and assignment Ops are not measurable – PyMC doesn’t know how to derive the log probability from generative models that use these.

(For all you ever wanted to know about what kinds of generative models PyMC can turn into logp, check out @ricardoV94 's excellent puzzle notebook)

So what you can do instead is think about two states: operating and outage. If your code, p(\text{operating} | \text{outage}) = 0.1 (what I called the “restoration probability” and p(\text{outage} | \text{operating}) = 0.02. So the whole state transition matrix will be:

P = \begin{bmatrix} 0.98 & 0.02 \\ 0.1 & 0.9 \end{bmatrix}

Where state 1 is “operating” and state 2 is “outage”. In either case the function is going to be autoregressive in the hidden states. Calling the state z_t, if z_{t-1} = 0 (operating) and z_t = 1 (outage), we get

\mu_t = \zeta_t

Where \zeta_t \sim \text{Gamma}(\cdot) is the outage magnitude. If z_{t-1} = 1 and z_t = 0, then \mu = 0 (the outage is repaired). Otherwise, we have \mu_t = \mu_{t-1}. To put this all together, define \Delta z_t = z_t - z_{t-1}, then:

\mu_t = (1 - [\Delta z_t < 0]) \cdot ([\Delta z_t = 0] \mu_{t-1} + [\Delta z_t > 0] \zeta_t)

PyMC model using pmx.DiscreteMarkovChain:

from pymc_experimental.distributions import DiscreteMarkovChain

coords = {
    'state':['operating', 'outage'],
    'time':np.arange(N),
    'factory':FACTORIES
}

with pm.Model(coords=coords) as m:
    P = pt.as_tensor_variable(np.array([[1 - outage_prob, outage_prob],
                                        [restoration_prob, 1 - restoration_prob]]))
    hidden_states = DiscreteMarkovChain('state', 
                                        P=P, 
                                        dims=['factory', 'time'],
                                        init_dist=pm.DiracDelta.dist([0, 0]))
    
    def step(state, state_tm1, mu_tm1, outage_mag_alpha, outage_mag_beta):
        Dz = state - state_tm1
        
        repaired = pt.lt(Dz, 0)
        outage = pt.gt(Dz, 0)
        cont = pt.eq(Dz, 0)
        
        outage_magnitude = pm.Gamma.dist(alpha=outage_mag_alpha, beta=outage_mag_beta, shape=state.shape[0])
        
        mu = (1 - repaired) * (cont * mu_tm1 + outage * outage_magnitude)
        return mu, collect_default_updates(mu)
    
    outage, updates = pytensor.scan(step, 
                                    sequences=[{'input':hidden_states.T, 'taps':[0, -1]}],
                                    outputs_info=[pt.zeros(n_factories)],
                                    non_sequences=[outage_mag_alpha, outage_mag_beta],
                                    strict=True)
plt.plot(pm.draw(outage))

image

As you can see, the output is almost the same. The biggest difference is that this model can never have a “double emission”, where you see an additional outage jump during an outage. I didn’t know if this was a feature or a bug – if it’s a feature, you can just add another random variable inside the inner function for a “outage enhancement”, and add it into the computation of mu.

The advantage of doing things this way is that now the inner function is composed only of functions that are measurable, so you should be able to wrap this into a CustomDist and do inference on it.

5 Likes

Thanks @jessegrabowski this is a very helpful summary. So what I am actually interested in is inference and then simulation, so it seems like your approach of using a HMM will be more fruitful.

The double emission is in fact a feature not a bug, so reformulating what you have above I think it could be expressed as

\mu_t = (1 - [\Delta z_t < 0]) \cdot ([\Delta z_t = 0] (\mu_{t-1} + b_t \zeta_t) + [\Delta z_t > 0] \zeta_t)

where b_t \sim Bernoulli

or in code something like

def step(state, state_tm1, mu_tm1, outage_mag_alpha, outage_mag_beta, outage_update):
    Dz = state - state_tm1

    repaired = pt.lt(Dz, 0)
    outage = pt.gt(Dz, 0)
    cont = pt.eq(Dz, 0)

    outage_magnitude = pm.Gamma.dist(
        alpha=outage_mag_alpha, beta=outage_mag_beta, shape=state.shape[0]
    )
    outage_update = pm.Bernoulli.dist(
        p=outage_prob, shape=state.shape[0]
    )
    mu = (1 - repaired) * (cont * (mu_tm1 + outage_update * outage_magnitude) + outage * outage_magnitude)
    return mu, collect_default_updates(mu)

A few follow up questions:

  1. In your expression for \mu_t is there a reason you have your first term as [\Delta z_t < 0] instead of [z_t = 0])?
  2. Are there any advantages to specifying the many emissions levels model as a 3 state HMM model instead of how I specified it above? i.e. something like
outage_prob = 0.02
restoration_prob = 0.1
outage_update = 0.1

P = pt.as_tensor_variable(
    np.array(
        [
            # stay operating, move to outage, move to second outage state
            [1 - outage_prob, outage_prob, 0],
            # go back on, stay in 1st outage state, move to 2nd outage state
            [restoration_prob, 1 - restoration_prob - outage_update, outage_update],
            # go back on, move to 1st outage, stay in 2nd outage state
            [restoration_prob, outage_update, 1 - restoration_prob - outage_update]
        ],
    )
)
  1. For inference, since I want to pass in observed to something I’ve defined as Deterministic, I think I need to wrap this in something like the following
with m:
    outage_obs = pm.Normal("outage_obs", mu=outage, sigma=0.001, observed=outage_data)
    idata = pm.sample()

but this is raising a NotImplementedError: Logprob method not implemented for DiscreteMarkovChainRV{inline=True} . I’ve taken a look at this notebook that @ricardoV94 posted on doing inference on a markov chain directly pymc-experimental/notebooks/discrete_markov_chain.ipynb at main · pymc-devs/pymc-experimental · GitHub but seem to also be getting the same error?

Yes, automatic logp inference for the DiscreteMarkovChain variable isn’t supported yet, it’s on my to-do list. Notice that in the example, it uses the HMM as a mean with gaussian noise, so it’s not 100% correct.

You could probably just ignore the DiscreteMarkovChain random variable for now and handle the state transitions inside the scan. Just pass the P matrix as a non_sequence, and use it to parameterize a Bernoulli at each step.

Your solution for additional jumps was the same as what I had in mind. The only glitch is that it doesn’t have memory of what the last shocked level was – when the state transitions back to “repaired”, it will jump all the way to zero, rather than a deque of shocks.

Having more states in the HMM might be a way to fix it; you’ll have to carry the information about the levels of all the states “under” the current state. But that should be possible, you’ll just have a mu1 and mu2, and when the state transitions from 0->1 you update mu1 and keep carrying it along. When the state transitions from 1->2 you update mu2 = mu1 + shock and carry it along, then when you transitions from 2->1 you bring back mu1 and set mu2 back to 0, when you transition from 1->0 set mu1 back to 0. Define mu = pt.stack([0, mu1, mu2]), then the current value is just mu[state] (after you apply all the update logic at each time step)

No reason, I was just being cute.

If you use pm.CustomDist you won’t need to do this. But to get access to custom dist you can’t use the DiscreteMarkovChain, as noted above. I recommend trying to get it up and running just with Bernoulli RVs inside the scan.