Using pytensor.scan to simulate from the posterior predictive

I’m working on a toy discrete survival model (code and data provided below). You can think of this as watching a group of people over time, making note of when each of them get some outcome (could be dying, could be buying some product, etc).

The model estimates the conversion rate at each tiemstep, and I can get the survival function by computing surv_func = pm.math.cumprod(1-conversion_rate). To validate the model, I want to be able to simulate what would happen to a group of N new people. Were I to write this in pure python, I would need a for loop (since the number of people who are at risk for the outcome in the current timestep depends on how many people got the outcome in the last timestep). Roughly the calculation in python is

import numpy as np

# Example values
at_risk = 1000 
timesteps = 14
# Would be estimated in the model
p = np.random.uniform(0, 0.1, size = timesteps)

# Calculation
conversions = np.zeros(timesteps)
for i in range(timesteps):

    n_event = np.random.binomial(n=at_risk - conversions.sum(), p = p[i])
    conversions[i] = n_event

    

I’m interested in adding the equivalent to the conversions array to my model below. I understand this involves using pytensor scan, but not really sure how I would use that function for the pattern I’ve described above.


import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import gamma
from collections import Counter
import pymc as pm
import arviz as az
import pytensor
import pytensor.tensor as pt


times = np.arange(0, 15)
dist = gamma(a=2.0, scale = 3.0)

# Probability of converting between t_j and t_{j+1}
proba = dist.cdf(times[1::]) - dist.cdf(times[:-1])

conversion_days = np.r_[times[:-1], np.inf]
conversion_proba = np.r_[proba, 1-proba.sum()]
conversions = np.random.choice(a=conversion_days, p=conversion_proba, size = 1000)

counts = Counter(np.sort(conversions))

weights = []
failure_times = []
at_risk = [counts.total()]

for j in counts.keys():

    if np.isinf(j):
        pass
    else:
        failure_times.append(j)
        weights.append(counts[j])
        at_risk.append(at_risk[-1] - counts[j])


weights = np.array(weights)
failure_times = np.array(failure_times)
at_risk = np.array(at_risk[:-1])

coords = {
    'timestep':range(failure_times.size)
}


with pm.Model(coords=coords) as model:

    mu = pm.Normal('mu', -3, .1)
    z = pm.Normal('z',0, 1, dims='timestep')
    sigma = pm.HalfNormal('sigma',sigma=1)

    logit_p = pm.Deterministic('logit_p', mu + z*sigma)
    p = pm.Deterministic('p', pm.math.invlogit(logit_p))
    surv_func = pm.Deterministic('surv_func', pm.math.cumprod(1-p), dims='timestep')

    y = pm.Binomial('conversions', p=p, n=at_risk, observed = weights)

I’d start by reading the scan docs, if you haven’t already. Then once you take a crack at an implementation, I can try to help debug any problems you run into. The high-level summary is that you will need to write an inner function (the body of your for loop), then provide the inputs to that function to the correct category of scan argument (sequence, non_sequence, or outputs_info)