# Mixture of Censored iid Normals

Continuing on from here:

where I asked about mixture of normals, thanks to @jessegrabowski and @ricardoV94, I have come to realize that due to sampling methods, in mixture models, MvNormals with diagonal cov is not identical to a bunch of univariate normals and actually can sample more efficiently (somewhat remedying label mixing if not completely).

However as it stands out, MvNormals is not applicable to where I want to use the mixture model in. Following again from @ricardoV94’s question I wanted to open a seperate topic for this. A summary of how the data would look like in may case can be described as follows:

1- Assume we are given a cluster’s center coordinates, a n dimensional vector, values between 0 to 10.
2- Any data i around this center is given as center + bias_i + (iid noise on each coordinate)_i. Moreover there is thresholding, any value below 1 is set 1.

Here is how I would generate it:

``````import numpy as np
import pymc as pm
import pytensor.tensor as ptt
import matplotlib.pyplot as plt

def plot_clusters(centers, data, max_val):

nclusters = centers.shape
fig,ax = plt.subplots(1, nclusters, figsize=(5*nclusters, 5))

for i in range(nclusters):
ax[i].plot(centers[i,:], color="red", zorder=1)
ax[i].plot(data[i].T, color="black", alpha=0.3, zorder=0)
ax[i].set_ylim(0, max_val+3)
ax[i].set_yticks(range(0, max_val+3))
ax[i].grid("on", alpha=0.3)

ndims = 7
nclusters = 5
max_val = 8
lower = 1
bias_sd = 1
noise_sd = 0.5
upper = np.inf
seed = 0

rng = np.random.default_rng(seed)
ndata_per_cluster = rng.integers(20,50, size=nclusters)
ndata = int(np.sum(ndata_per_cluster))

cluster_centers = rng.random((nclusters, ndims))*max_val
data = []

for i in range(nclusters):
biases = rng.normal(0, bias_sd, (ndata_per_cluster[i],))[:,None]
noise = rng.normal(0, noise_sd, (ndata_per_cluster[i], ndims))
cluster_data = cluster_centers[i:i+1,:] + biases + noise
cluster_data[cluster_data<lower] = lower
data.append(cluster_data)

plot_clusters(cluster_centers, data,  max_val)
data = np.concatenate(data)
``````

So the model that I tried to use initially was as such (assuming I get init and center_mus from k-means or sth but for now I will use the true centers):

``````#normally init would come from spectral or k-means clustering with some
#bias correction on the data
cluster_centers = cluster_centers[np.argsort(cluster_centers[:,0]),:]
cluster_centers[cluster_centers<lower] = lower
init = [cluster_centers[:,0], cluster_centers[:,1:]]
center_mus =  [cluster_centers[:,0], cluster_centers[:,1:]]
mu_sigma = 1
nclusters_fit = nclusters
bias_sd = 2
conc = 10

with pm.Model() as model:

μ0 = pm.Normal("μ0",
mu=center_mus,
sigma=mu_sigma,
shape=nclusters_fit,
transform=pm.distributions.transforms.ordered,
initval=init)

μ1 = pm.Normal("μ1",
mu=center_mus,
sigma=mu_sigma,
shape=(nclusters_fit, ndims-1),
initval=init)

μ = pm.Deterministic("μ", ptt.concatenate([μ0[:,None], μ1], axis=1))
σ = pm.InverseGamma("σ", 2, 1)
biases = pm.Normal("biases", 0, bias_sd, shape=(ndata))
weights = pm.Dirichlet("w", conc*np.ones(nclusters_fit))
components =\
[pm.Normal.dist(μ[i,:][None,:]+biases[:,None], σ) for i in range(nclusters_fit)]
mix = pm.Mixture.dist(w=weights, comp_dists=components)

pm.Censored("likelihood", mix, lower=lower, upper=upper, observed=data)

trace = pm.sample(draws=2000, tune=1000, chains=6, target_accept=0.95)
``````

There is quite a bit of cheating here I realize. But I did this as a proof of concept and even though I supplied censored centres as initials, the model correctly guesses centre components that are below 1. So censoring is indeed doing what it is supposed to be doing. Multi-modality of posteriors is not horrible, some here and there. But I know again I am cheating here by supplying true centres though in principle centers from k-means or spectral clustering work just as fine (after some preprocessing for the bias).

The question is how to make this model less prone to centre switching without cheating? I can still supply initials from k-means etc maybe but seems like non-mixing property of MvNormal would also be a quite boon in this direction.

Note that I am fine with using a MvNormal with a diagonal cov in which components are independent (here they are actually iid). In principle I guess one could custom define censored for such a MvNormal. Whenever one component is below lower, likelihood would be CDF of that component multiplied by PDF of the rest. But

1- I don’t know really where to start, I don’t know inner workings of pymc that well to understand how to construct this,
2- If I define a custom distribution like this, would I be again beating the non-mixing of MvNormal?

ps: As a side note, if I deal with less number of clusters such as 3, just supplying the initial values as centers (say from k-means) but keeping the center_mus vague (i.e all 4) with large mu_sigma like 5 also works fine when using init=‘advi+adapt_diag’. Haven’t now tested without advi but feel like that makes a positive impact based on previous experience.

If you are not afraid of trying something more experimental, `MarginalModel` in pymc-experimental, may do what you need: MarginalModel — pymc_experimental 0.0.13 documentation

``````import pymc as pm
import numpy as np

from pymc_experimental import MarginalModel
from pymc.distributions.transforms import ordered

n_clusters = 5
coords = {
"cluster": range(n_clusters),
"ndim": ("x", "y"),
"obs": range(10),
}
with MarginalModel(coords=coords) as m:
idx = pm.Categorical("idx", p=np.ones(n_clusters) / n_clusters, dims=("obs",))

mu_x = pm.Normal("mu_x", dims=("cluster",), transform=ordered, initval=np.linspace(-1, 1, n_clusters))
mu_y = pm.Normal("mu_y", dims=("cluster",))
mu = pm.math.concatenate([mu_x[..., None], mu_y[..., None]], axis=-1)

sigma = pm.HalfNormal("sigma")

def dist(idx, mu, sigma, lower, upper, _):
# Define a mixture where the output x, y depends on the value of idx
rv = pm.Censored.dist(pm.Normal.dist(mu[:, None], sigma), lower=lower, upper=upper, shape=data.shape)
for i in range(n_clusters - 1):
rv = pm.math.where(
pm.math.eq(idx, i),
rv,
pm.Censored.dist(pm.Normal.dist(mu[i + 1][:, None], sigma), lower=lower, upper=upper, shape=data.shape),
)
return rv

lower = -3
upper = 3
data = np.array([[-1, -1], [0, 0], [1, 1], [2, 2], [-3, -3]] * 2).T.astype("float")
censored_mix = pm.CustomDist(
"censored_mix",
idx, mu, sigma, lower, upper,
dist=dist,
observed=data,
dims=("ndim", "obs"),
)

# Marginalize away the idx, creating a Mixture likelihood
m.marginalize(idx)

with m:
idata = pm.sample()
``````

Note that we explicitly control how the weight are used, so that the pairs of x, y are related to the same `idx` variable, and therefore the logp should be equivalent to a “CensoredMvNormal Mixture”

Disclaimer: I didn’t try to actually sample.

2 Likes

Cutting edge technology? Count me in. I will let you know if I manage to get it running and get some results. Thanks

3 Likes