Hey folks,
I’ve been banging my head on this for a while, and it also seems to have chatGPT thoroughly stumped, so I thought I’d impose upon the human experts here for some assistance.
Error messages encountered:
Below is a model that is complicated, but nonetheless I’m fairly confident represents a reprex of the problem.
When I attempt to compile with compile_kwargs = {"mode": "NUMBA"}
, I get:
pytensor.graph.utils.InconsistencyError: Multiple destroyers of joined_inputs
and with compile_kwargs = {"mode": "FAST_COMPILE"}
I get:
AttributeError: 'Scratchpad' object has no attribute 'ufunc'
Model Description:
The model starts with [n_obs,n_var]
matrices A_t1
and B_t1
, and for each a multivariate normal is used to impute missing values. The new matrices with the imputed values are then used as predictors in a hilbert-space gaussian process regression where each of the n_var
columns is associated with a 1d-input/2d-output HSGP that are summed to yield a prediction for a given row. Finally, the HSGP regression is repeated to model heteroskedasticity.
Model changes that do not affect the manifestation of the issue:
- Using the official GP machinery rather than my hand-coded HSGP
- Using hand-coded missing value imputation rather than automatic imputation
Model changes that individually each resolve the issue:
- Removing the missing data imputation
- Removing the heteroskedasticity HSGP and using a scalar for
sigma
in the likelihoods - Keeping the heteroskedasticity HSGP but removing the HSGP on the mean and using a scalar for
mu
in the likelihoods - Reducing the model so that there’s no
B_t1
/B_t2
and the HSGPs are 1d-output
So I get the feeling that there is some interaction between all these moving parts (possibly the mere size of the graph?) that is causing the issue. Any thoughts?
Model and code to generate synthetic data:
########
# imports ----
########
import numpy as np
import pymc as pm
import pytensor.tensor as pt
from pytensor import scan
########
# user-defined functions ----
########
def mvn_with_imputation(name, x, n_var, is_missing, n_missing, auto_impute=True):
"""
Construct a multivariate normal distribution with imputation for missing values.
Parameters
----------
x : array-like
The observed data, shape (n_x, n_var).
n_var : int
The number of variables (columns) in the data.
is_missing : array-like
A boolean array indicating which values in `x` are missing.
n_missing : int
The number of missing values in `x`.
auto_impute : bool, optional
If True, rely on pymc to automatically impute missing values.
"""
mu = pm.Normal(f"{name}_mu", mu=0, sigma=1.0, shape=n_var)
sd_dist = pm.Weibull.dist(alpha=2, beta=1,size=n_var)
chol, cor, sd = pm.LKJCholeskyCov(
f"{name}_cov"
, n = n_var
, eta = 1.0
, sd_dist = sd_dist
# , compute_corr = False
)
if auto_impute:
complete = pm.MvNormal(
f"{name}_mvn"
, mu = mu
, chol = chol
, observed = x
)
return(complete)
else:
imputed = pm.Normal(f"{name}_imputed",shape = n_missing)
complete = pt.tensor_copy(x)
complete = pt.set_subtensor(
complete[is_missing]
, imputed
)
pm.Potential(
f"{name}_mvn_logp"
, pm.MvNormal.logp(
value = complete
, mu = mu
, cov = pt.matmul(chol, pt.swapaxes(chol, -1, -2))
)
)
return(complete)
def get_PHI_L(x, n_basis_fns, boundary_factor, drop_first=False):
"""
Construct the basis functions for the latent space.
Parameters
----------
x : array-like
The input data, shape (n_x, n_col).
n_basis_fns : int
The number of basis functions to compute.
boundary_factor : float
A factor to scale the boundary condition.
drop_first : bool, optional
If True, the first basis function is dropped.
Returns
-------
PHI : array-like
The computed basis functions, shape (n_x, n_col, M_eff).
L : array-like
The boundary condition for each column, shape (n_col,).
"""
# if we plan to drop the first basis, bump up M so we can slice it away
if drop_first:
n_basis_fns += 1
# raw basis indices 1,2,...,M
m_raw = pt.arange(1, n_basis_fns + 1)
# effective m = m_raw[1:] if dropping, else m_raw
m = m_raw[1:] if drop_first else m_raw # shape (M_eff,)
# for each column, boundary L = boundary_factor * max|x[:,c]|
L = boundary_factor * pt.max(pt.abs(x), axis=0) # shape (n_col,)
# helper A = π/(2L) for each column
A = pt.pi / (2 * L) # shape (n_col,)
# reshape for broadcast:
# x_exp (n_x, n_col, 1)
# L_exp (1, n_col, 1)
# A_exp (1, n_col, 1)
# m_exp (1, 1, M_eff)
x_exp = x[:, :, None]
L_exp = L[None, :, None]
A_exp = A[None, :, None]
m_exp = m[None, None, :]
# sin( A * (x + L) * m ) / sqrt(L)
PHI = pt.sin(A_exp * (x_exp + L_exp) * m_exp) / pt.sqrt(L_exp)
return PHI.transpose(0,2,1), L
def compute_basis_helper(L, n_basis_fns, drop_first=True):
"""
Compute the basis helper functions.
Parameters
----------
L : array-like
The boundary condition for each column, shape (n_col,).
n_basis_fns : int
The number of basis functions to compute.
drop_first : bool, optional
If True, the first basis function is dropped.
Returns
-------
basis_helper : array-like
The computed basis helper functions, shape (M_eff, n_col).
"""
if drop_first:
n_basis_fns += 1
# raw idx 1..M
m_raw = pt.arange(1, n_basis_fns + 1)
# select m
if drop_first:
m = m_raw[1:]
else:
m = m_raw
A = (pt.pi / (2 * L))[None, :] # shape (1, n_col)
m_mat = m[:, None] # shape (M_eff,1)
# broadcast → (M_eff, n_col)
return -0.25 * (A**2) * (m_mat**2)
def hsgp_z(lengthscale, beta, PHI_data, basis_helper_data, sqrt_tau):
"""
Compute the latent variable z for the Hilbert-Space Gaussian process.
Parameters
----------
lengthscale : array-like
The lengthscale parameter, shape (n_var,).
beta : array-like
The beta coefficients, shape (n_basis_fns, n_var).
PHI_data : array-like
The basis functions evaluated at the data, shape (n_x, n_col, n_basis_fns).
basis_helper_data : array-like
The basis helper functions, shape (n_basis_fns, n_col).
sqrt_tau : float
The square root of the tau parameter.
Returns
-------
z : array-like
The computed latent variable z, shape (n_x, n_var).
"""
sqrt_term = pt.sqrt(lengthscale * sqrt_tau)
exp_term = pt.exp(lengthscale*lengthscale * basis_helper_data)
weights = sqrt_term * exp_term * beta
return(pt.sum(PHI_data * weights[pt.newaxis,:,:], axis=1))
def hsgp_lm(name, A_PHI, A_basis_helper, A_n_obs, B_PHI, B_basis_helper, n_var, sqrt_tau):
"""
Compute the latent variables for the Gaussian process latent variable model.
Parameters
----------
name : str
The name prefix for the variables.
A_PHI : array-like
The basis functions evaluated at A_t1_pred, shape (n_x, n_col, n_basis_fns).
A_basis_helper : array-like
The basis helper functions for A, shape (n_basis_fns, n_col).
A_n_obs : int
The number of observations in A.
B_PHI : array-like
The basis functions evaluated at B_t1_pred, shape (n_x, n_col, n_basis_fns).
B_basis_helper : array-like
The basis helper functions for B, shape (n_basis_fns, n_col).
n_var : int
The number of variables (columns) in the data.
sqrt_tau : float
The square root of the tau parameter.
Returns
-------
A_mu, B_mu : list of array-like
The computed latent variables for A and B, each shape (n_x, n_var).
"""
f_lengthscale = pm.Weibull(f"{name}_lengthscale", alpha=2, beta=1, shape=n_var)
A_beta = pm.Normal(f"{name}_A_beta", mu=0, sigma=1.0, shape=(n_basis_fns, n_var))
B_beta = pm.Normal(f"{name}_B_beta", mu=0, sigma=1.0, shape=(n_basis_fns, n_var))
z0 = hsgp_z(f_lengthscale, A_beta, A_PHI, A_basis_helper, sqrt_tau).T
z1 = hsgp_z(f_lengthscale, B_beta, B_PHI, B_basis_helper, sqrt_tau).T
r = pm.Uniform(f"{name}_r", lower=0, upper=1, shape=[1,n_var])
A_z = z0[:,:A_n_obs].T # just those values evaluated at A_t1_pred
B_z = (
( z0[:,A_n_obs:].T * r ) # z0 evaluated at B_t1_pred
+ ( z1.T * pm.math.sqrt(1 - r*r) )
)
f_amplitude_center = pm.Normal(f"{name}_f_amplitude_center", mu=0, sigma=1.0,shape=[1,n_var])
f_amplitude_delta = pm.HalfNormal(f"{name}_f_amplitude_delta", sigma=1.0,shape=[1,n_var])
A_f_amplitude = pm.math.exp(.5* (f_amplitude_center + f_amplitude_delta))
B_f_amplitude = pm.math.exp(.5* (f_amplitude_center - f_amplitude_delta))
f_intercept_center = pm.Normal(f"{name}_f_intercept_center", mu=0, sigma=1.0,shape=[1,n_var])
f_intercept_delta = pm.HalfNormal(f"{name}_f_intercept_delta", sigma=1.0,shape=[1,n_var])
A_f_intercept = f_intercept_center + f_intercept_delta
B_f_intercept = f_intercept_center - f_intercept_delta
intercept_center = pm.Normal(f"{name}_intercept_center", mu=0, sigma=1.0)
intercept_delta = pm.HalfNormal(f"{name}_intercept_delta", sigma=1.0)
A_intercept = intercept_center + intercept_delta
B_intercept = intercept_center - intercept_delta
A_mu = (
A_intercept
+ pt.sum(
A_z * A_f_amplitude + A_f_intercept
, axis=1
)
)
B_mu = (
B_intercept
+ pt.sum(
B_z * B_f_amplitude + B_f_intercept
, axis=1
)
)
return([A_mu,B_mu])
########
# import & structure the data ----
########
n_obs = 100
n_var = 3
A_t1_np = np.random.normal(size=(n_obs, n_var)) # Simulated data for t1
B_t1_np = np.random.normal(size=(n_obs, n_var)) # Simulated data for t1
A_t2_np = np.random.normal(size=n_obs) # Simulated data for t2
B_t2_np = np.random.normal(size=n_obs) # Simulated data for t2
# set 10% of t1 to NaN
n_nan = int(0.1 * n_obs * n_var)
A_t1_missing_idx = np.random.choice(n_obs * n_var, n_nan, replace=False)
A_t1_flat = A_t1_np.flatten()
A_t1_flat[A_t1_missing_idx] = np.nan
A_t1_with_missing_np = A_t1_flat.reshape(n_obs, n_var)
B_t1_nan_idx = np.random.choice(n_obs * n_var, n_nan, replace=False)
B_t1_flat = B_t1_np.flatten()
B_t1_flat[B_t1_nan_idx] = np.nan
B_t1_with_missing_np = B_t1_flat.reshape(n_obs, n_var)
# drop rows in A_t1_with_missing_np and B_t1_with_missing_np that have all NaN values
A_t1_with_missing_np = A_t1_with_missing_np[~np.all(np.isnan(A_t1_with_missing_np), axis=1)]
B_t1_with_missing_np = B_t1_with_missing_np[~np.all(np.isnan(B_t1_with_missing_np), axis=1)]
A_t1_is_missing = np.isnan(A_t1_with_missing_np)
A_t1_n_missing = int(np.sum(A_t1_is_missing))
B_t1_is_missing = np.isnan(B_t1_with_missing_np)
B_t1_n_missing = int(np.sum(B_t1_is_missing))
boundary_factor = 4
n_basis_fns = 20
sqrt_tau = np.sqrt(2 * np.pi)
########
# express the model ----
########
with pm.Model() as model:
A_t2_Data = pm.Data("A_t2_Data", A_t2_np)
B_t2_Data = pm.Data("B_t2_Data", B_t2_np)
A_t1_complete = mvn_with_imputation(
"A_t1"
, x = A_t1_with_missing_np
, n_var = n_var
, is_missing = A_t1_is_missing
, n_missing = A_t1_n_missing
)
B_t1_complete = mvn_with_imputation(
"B_t1"
, x = B_t1_with_missing_np
, n_var = n_var
, is_missing = B_t1_is_missing
, n_missing = B_t1_n_missing
)
A_t1_PHI, A_t1_L = get_PHI_L(pt.concatenate([A_t1_complete,B_t1_complete],axis=0), n_basis_fns, boundary_factor)
A_t1_basis_helper = compute_basis_helper(A_t1_L, n_basis_fns)
B_t1_PHI, B_t1_L = get_PHI_L(B_t1_complete, n_basis_fns, boundary_factor)
B_t1_basis_helper = compute_basis_helper(B_t1_L, n_basis_fns)
# A_mu = pm.Normal("A_mu", mu=0, sigma=1.0)
# B_mu = pm.Normal("B_mu", mu=0, sigma=1.0)
A_mu,B_mu = hsgp_lm(
name = "mu"
, A_PHI = A_t1_PHI
, A_basis_helper = A_t1_basis_helper
, A_n_obs = A_t1_complete.shape[0]
, B_PHI = B_t1_PHI
, B_basis_helper = B_t1_basis_helper
, n_var = n_var
, sqrt_tau = sqrt_tau
)
# A_sigma = pm.Normal("A_sigma", mu=0, sigma=1.0)
# B_sigma = pm.Normal("B_sigma", mu=0, sigma=1.0)
A_sigma,B_sigma = hsgp_lm(
name = "sigma"
, A_PHI = A_t1_PHI
, A_basis_helper = A_t1_basis_helper
, A_n_obs = A_t1_complete.shape[0]
, B_PHI = B_t1_PHI
, B_basis_helper = B_t1_basis_helper
, n_var = n_var
, sqrt_tau = sqrt_tau
)
A_t2_obs = pm.Normal(
"A_t2_obs"
, mu = A_mu
, sigma = pm.math.exp(.5*A_sigma)
, observed = A_t2_Data
)
B_t2_obs = pm.Normal(
"B_t2_obs"
, mu = B_mu
, sigma = pm.math.exp(.5*B_sigma)
, observed = B_t2_Data
)
# model.debug() # no errors
with model:
prior = pm.sample_prior_predictive(
samples = 1
, random_seed = 112358
)
pm.set_data({
"A_t2_Data": prior.prior_predictive.A_t2_obs.to_numpy().squeeze()
, "B_t2_Data": prior.prior_predictive.B_t2_obs.to_numpy().squeeze()
})
with model:
posterior_trace = pm.sample(
# compile_kwargs = {"mode": "NUMBA"}
compile_kwargs = {"mode": "FAST_COMPILE"}
)