I’m building a purely additive model, with no intercept value, where
In this case Y represents the hours a person worked during the day, and each X_i is the count of tasks they completed for each type of task during the day. Here’s a synthetic dataset for this problem:
import copy
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
from typing import List, Tuple
np.random.seed(1234)
task_number = 300
record_number = 20 * task_number
# create random matrix of tasks completed for each record/day
df = pd.DataFrame()
df['time_taken'] = np.random.normal(loc=8, scale=1, size=record_number) # Roughly 8 hour work day
tasks = np.arange(1, task_number + 1)
for i in tasks:
df[f'task_{i}'] = 0
for i in df.index:
# Randomly select 3 to 5 tasks completed for each record (same task can be completed multiple times per day)
cpt_tasks = np.random.choice(tasks, np.random.randint(3, 6), replace=True)
for ct in cpt_tasks:
df.loc[i, f'task_{ct}'] += 1
df.head()
task_columns = [c for c in df.columns if 'task_' in c]
df[task_columns] = df[task_columns].astype(float)
task_length_guesses = {f'task_{i}': 2.0 for i in tasks} # Naive starting guess of 2 hours per task
sigma_lam = 1
start = copy.deepcopy(task_length_guesses)
start['modelsigma'] = sigma_lam
If I set task_number
in the code above to something like 50
, the code below will work because the compilation nesting doesn’t get too deep.
def create_pymc_coef_prior(coef_mode: float, task: str) -> pm.Lognormal:
assert coef_mode > 0, f'task {task} has a non-positive standard hours value: {coef_mode}'
lognormal_std = coef_mode / 3
coef_sigma = np.sqrt(np.log(1 + (lognormal_std / coef_mode) ** 2))
assert coef_sigma > 0, f'task {task} has a non-positive sigma value: {coef_sigma}'
coef_mu = np.log(coef_mode) + coef_sigma ** 2 # for lognormal \mu = \ln(\text{Mode}) + \sigma^2
return pm.Lognormal(
name=f"task_{task}",
mu=coef_mu,
sigma=coef_sigma,
transform=None
)
def create_pymc_coef_priors(task_columns: List[str], task_length_guesses: dict) -> List[pm.Lognormal]:
coef_priors = list()
for task_col in task_columns:
task = task_col.split('_')[1]
coef_mode = round(task_length_guesses[task_col], 2)
coef_prior = create_pymc_coef_prior(coef_mode=coef_mode, task=task)
coef_priors.append(coef_prior)
return coef_priors
# Define the model
with pm.Model() as model:
coef_priors = create_pymc_coef_priors(task_columns, task_length_guesses)
expected_work_hours = sum([coef_prior * df[task_col].values for coef_prior, task_col
in zip(coef_priors, task_columns)])
# Likelihood of observed WorkHours with lognormal distribution
coef_sigma = pm.Exponential(name='modelsigma', lam=sigma_lam, transform=None)
observed_workhours = pm.Normal(
name='observed_workhours',
mu=expected_work_hours,
sigma=coef_sigma,
observed=df['time_taken'].values,
)
trace = pm.sample(draws=2100, tune=1100, return_inferencedata=True, progressbar=True, cores=4,
init="adapt_diag", initvals=start)
However, I need the model to handler larger values for task_number
, like 300, or 800. I’ve found some documentation here about how I should be using pytensor.scan
to get around this issue. Here’s some referenc URL’s I’ve looked at:
https://pytensor.readthedocs.io/en/latest/library/scan.html
I did my best to try to follow these examples, but I’m definitely still getting something wrong. Here’s the current version of my pytensor.scan
code. Can someone tell me what I’m doing wrong here?
def step(coef_mode, task_array, acc):
# Create LogNormal prior for coefficient
lognormal_std = coef_mode / 3 # Set the standard deviation of prior in lognormal space to 1/3 of the mode
coef_sigma = np.sqrt(np.log(1 + (lognormal_std / coef_mode) ** 2)) # Convert lognormal std to normal std
coef_mu = np.log(coef_mode) + coef_sigma ** 2 # for lognormal \mu = \ln(\text{Mode}) + \sigma^2
coef_prior = pm.Lognormal(name='task_coeff', mu=coef_mu, sigma=coef_sigma, transform=None)
return acc + coef_prior * task_array
coef_modes_tensor = pt.as_tensor_variable(list(task_length_guesses.values()))
operation_arrays_tensor = pt.stack([pt.as_tensor_variable(df[task_col].values) for task_col in task_columns])
initial_acc = pt.zeros_like(operation_arrays_tensor[0], dtype='float64')
with pm.Model() as model:
# Use PyTensor's scan to compute expected workhours
expected_work_hours, _ = pytensor.scan(
fn=step,
sequences=[coef_modes_tensor, operation_arrays_tensor],
outputs_info=initial_acc
)
# Likelihood of observed WorkHours with lognormal distribution
coef_sigma = pm.Exponential(name='modelsigma', lam=sigma_lam, transform=None)
observed_workhours = pm.Normal(
name='observed_workhours',
mu=expected_work_hours[-1], # We are interested in the last accumulated value
sigma=coef_sigma,
observed=df['time_taken'].values,
)
trace = pm.sample(draws=2100, tune=1100, return_inferencedata=True, progressbar=True, cores=4,
init="adapt_diag", initvals=start)