CustomDist with multivariate nodes

Hi,

Apologies in advance if I have misunderstood something fundamental. I am still trying to figure out Python and PyMC. I am trying to fit a mixture model where each component of the mixture is a custom distribution where the probability density function is a product of a multivariate normal density and multiple categorical densities, so the likelihood contributions for each individual i are of the form:

f\left(y_i\right) = \sum_{k=1}^K w_kf\left(y_{i, 2:5} \mid \mu_k, \Sigma_k\right)f\left(y_{i, 1} \mid p_{1k}\right)f\left(y_{i, 2} \mid p_{2k}\right)

where p_{1k} and p_{2k} are vectors of length 2 and 3 say, respectively, \mu_k is a vector of length 3 and \Sigma_k is a (3 x 3) matrix. I have coded it up such that y_{i1} and y_{i2} are category indicators for two categorical r.v.s, and y_{i,3:5} is a 3D vector from a MVN distribution. I think I have to code it in this way since CustomDist only allows for a single value argument.

I have tried various versions of this, the current attempt is below using CustomDist. Currently I am getting broadcasting errors but I am struggling to figure out the correct signature. A reprex is below.

If anyone had any thoughts on what I am doing wrong, then I’d be grateful.

Many thanks.

## import packages
import pymc as pm
import numpy as np
import pytensor.tensor as pt
from pytensor.tensor import TensorVariable

## simulate data
mu1 = [0, 1, 2]
sigma1 = [[1, 0.5, 0.5], [0.5, 1, 0.5], [0.5, 0.5, 1]]
mu2 = [2, 1, 0]
sigma2 = [[1, -0.5, -0.5], [-0.5, 1, -0.5], [-0.5, -0.5, 1]]
dataset1 = np.random.multivariate_normal(mu1, sigma1, size = 100)
dataset2 = np.random.multivariate_normal(mu2, sigma2, size = 100)
p11 = [0.2, 0.8]
dataset_cat11 = np.random.choice(2, size = 100, p = p11)
p12 = [0.4, 0.6]
dataset_cat12 = np.random.choice(2, size = 100, p = p12)
p21 = [0.2, 0.5, 0.3]
dataset_cat21 = np.random.choice(3, size = 100, p = p21)
p22 = [0.4, 0.1, 0.5]
dataset_cat22 = np.random.choice(3, size = 100, p = p22)
w = np.random.binomial(1, 0.3, size = 100)
dataset = np.concatenate([np.c_[dataset_cat11[w == 0], dataset_cat21[w == 0]], dataset1[w == 0]], axis = 1)
dataset = np.concatenate([dataset, np.concatenate([np.c_[dataset_cat12[w == 1], dataset_cat22[w == 1]], dataset1[w == 1]], axis = 1)], axis = 0)

## define stick-breaking function
def stick_breaking(nu):
  portion_remaining = pt.concatenate([[1], pt.extra_ops.cumprod(1 - nu)[:-1]])
  return nu * portion_remaining

## set number of components
ncomp = 5
ncont = 3
ncat = 2

## set number of categories for each categorical variable
ncat_each = [2, 3]

## define custom likelihood function
def logp_comp(value: TensorVariable, mu: TensorVariable, chol: TensorVariable, p1: TensorVariable, p2: TensorVariable) -> TensorVariable:
  ## calculate log-likelihoods
  y1 = pm.Categorical.dist(p = p1)
  lp = pm.logp(y1, value[0])
  y2 = pm.Categorical.dist(p = p2)
  lp += pm.logp(y2, value[1])
  y3 = pm.MvNormal.dist(mu = mu, chol = chol)
  lp += pm.logp(y3, value[2:])
  return lp

## define model
with pm.Model() as model:
  ## setup data
  y_data = pm.Data("y_data", dataset)
    
  ## DP prior on mixing weights
  alpha = pm.Gamma("alpha", 1.0, 1.0)
  nu = pm.Beta("nu", 1.0, alpha, shape = ncomp - 1)
  nu1 = pt.concatenate([nu, [1]])
  w = pm.Deterministic("w", stick_breaking(nu1))
  
  ## priors on categorical weights
  p = [[pm.Dirichlet("dpriors" + str(i * ncomp + n), a = np.ones(ngroup)) for i, ngroup in enumerate(ncat_each)] for n in range(ncomp)]
  
  ## define priors for base distribution
  ## sd_dist must be tensor variable (hence .dist part)
  mu = [pm.Normal("mu" + str(i), mu = 0, sigma = 2.5, shape = ncont) for i in range(ncomp)]
  sd_dists = [pm.Exponential.dist(1.0, shape = ncont) for i in range(ncomp)]
  packed_chol = [pm.LKJCholeskyCov(
      "chol_cov" + str(i), 
      n = ncont, 
      eta = 1, 
      sd_dist = sd_dists[i], 
      compute_corr = False
    ) for i in range(ncomp)]
  chol = [pm.expand_packed_triangular(ncont, packed_chol[i], lower = True) for i in range(ncomp)]
  
  components = [pm.CustomDist(
    "components" + str(i),
    mu[i], chol[i], p[i][0], p[i][1],
    logp = logp_comp,
    observed = y_data) for i in range(ncomp)]
  y = pm.Mixture("y", w = w, comp_dists = components, observed = y_data)

  ## generate posterior samples
  trace = pm.sample(3000, random_seed = rng)

ValueError: Could not broadcast dimensions. Incompatible shapes were [(ScalarConstant(ScalarType(int64), data=np.int64(1)), ScalarConstant(ScalarType(int64), data=np.int64(3))), (ScalarConstant(ScalarType(int64), data=np.int64(3)), ScalarConstant(ScalarType(int64), data=np.int64(3))), (ScalarConstant(ScalarType(int64), data=np.int64(1)), ScalarConstant(ScalarType(int64), data=np.int64(2))), (ScalarConstant(ScalarType(int64), data=np.int64(1)), ScalarConstant(ScalarType(int64), data=np.int64(3)))].

I suggest you try a simpler model and go from there. One thing that is immediately wrong is that you should use pm.CustomDist.dist instead of named variables. The other is you probably need to pass the signature to CustomDist since it’s not univariate: CustomDist — PyMC dev documentation

For example the signature of a dirichlet rv is (p)->(p), and a mv_normal is (m),(m,m)->(m).

Thanks for the quick reply. I did have the .dist version earlier on, but missed that here. My apologies. Thanks for the advice on the signature. The current version below seems to start running the NUTS algorithm before sending back an input dimension mismatch error. I’ll keep plugging away at it. Many thanks.

components = [pm.CustomDist.dist(
  mu[i], chol[i], p[i][0], p[i][1],
  logp = logp_comp,
  signature = '(3),(3,3),(2),(3)->(5)') for i in range(ncomp)]

Thanks for your help. The following code seems to work now. It seems I had to set the signature correctly and then figure out how to access the correct parts of the TensorVariable in the logp function. I also needed to add a random number function as well. I am not 100% convinced this is correct, it was mainly developed through trial-and-error, but it seems to be returning the expected posterior predictive distribution when tested. Cheers!


## set number of components
ncomp = 5
ncont = 3
ncat = 2

## set number of categories for each categorical variable
ncat_each = [2, 3]

## create signature
sig_comp = ",".join(["(" + str(ncat_each[i]) + ")" for i in range(len(ncat_each))])
sig_comp = sig_comp + ",(" + str(ncont) + "),(" + str(ncont) + "," + str(ncont) + ")"
sig_comp = sig_comp + "->(" + str(ncont + ncat) + ")"

## define custom likelihood function
def logp_comp(value: TensorVariable, p1: TensorVariable, p2: TensorVariable, mu: TensorVariable, chol: TensorVariable) -> TensorVariable: 
  ## calculate log-likelihoods
  y1 = pm.Categorical.dist(p = p1)
  y2 = pm.Categorical.dist(p = p2)
  lp = pm.logp(y1, value[:,0])
  lp += pm.logp(y2, value[:,1])
  y3 = pm.MvNormal.dist(mu = mu, chol = chol)
  lp += pm.logp(y3, value[:,2:])
  return lp
  
def random_comp(
  p1: np.ndarray | float,
  p2: np.ndarray | float,
  mu: np.ndarray | float,
  chol: np.ndarray | float,
  rng: Optional[np.random.Generator] = None,
  size: Optional[Tuple[int]] = None,
) -> np.ndarray | float:
  r1 = np.random.choice(p1.size, size = size, p = p1[0])
  r2 = np.random.choice(p2.size, size = size, p = p2[0])
  cov = chol[0] @ chol[0].T
  r = np.stack((r1, r2), axis = 1)
  r3 = np.random.multivariate_normal(mean = mu[0], cov = cov, size = size)
  r = np.concatenate((r, r3), axis = 1)
  return r

## define model
with pm.Model() as model:
  ## setup data
  y_data = pm.Data("y_data", dataset)
    
  ## DP prior on mixing weights
  alpha = pm.Gamma("alpha", 1.0, 1.0)
  nu = pm.Beta("nu", 1.0, alpha, shape = ncomp - 1)
  nu1 = pt.concatenate([nu, [1]])
  w = pm.Deterministic("w", stick_breaking(nu1))
  
  ## priors on categorical weights
  p = [[pm.Dirichlet("dpriors" + str(i * ncomp + n), a = np.ones(ngroup)) for i, ngroup in enumerate(ncat_each)] for n in range(ncomp)]
  
  ## define priors for base distribution
  ## sd_dist must be tensor variable (hence .dist part)
  mu = [pm.Normal("mu" + str(i), mu = 0, sigma = 2.5, shape = ncont) for i in range(ncomp)]
  sd_dists = [pm.Exponential.dist(1.0, shape = ncont) for i in range(ncomp)]
  packed_chol = [pm.LKJCholeskyCov(
      "chol_cov" + str(i), 
      n = ncont, 
      eta = 1, 
      sd_dist = sd_dists[i], 
      compute_corr = False
    ) for i in range(ncomp)]
  chol = [pm.expand_packed_triangular(ncont, packed_chol[i], lower = True) for i in range(ncomp)]
  
  components = [pm.CustomDist.dist(
    p[i][0], p[i][1], mu[i], chol[i],
    logp = logp_comp,
    random = random_comp,
    signature = sig_comp) for i in range(ncomp)]
  y = pm.Mixture("y", w = w, comp_dists = components, observed = y_data)

  ## generate posterior samples
  trace = pm.sample(3000)