Covariance Estimation including uncertainties

I have a variable X with shape (100, 2) with uncertainties X_error (100, 2). I would like to estimate the covariance matrix between the 2 features X[:, 0] and X[:, 1]. How can I do that using a LKJCholesky prior and MvNormal (or using something else)? How do I incorporate X_error into the below code?

with pm.Model() as model:
    chol, corr, stds = pm.LKJCholeskyCov(
        "chol", n=2, eta=2.0, sd_dist=pm.Exponential.dist(1.0), compute_corr=True
    )
    cov = pm.Deterministic("cov", chol.dot(chol.T))
    μ = pm.Normal("μ", 0.0, 10, shape=2, testval=X.mean(axis=0))
    obs = pm.MvNormal("obs", μ, chol=chol, shape=(len(X),2), observed=X)

Hi Hitesh,

I know it’s been a while since you posted this, so not sure it’s still of interest to you, but: I would think what you could do is to mostly keep your model, but modify it as follows:

with pm.Model() as model:
    chol, corr, stds = pm.LKJCholeskyCov(
        "chol", n=2, eta=2.0, sd_dist=pm.Exponential.dist(1.0), compute_corr=True
    )
    cov = pm.Deterministic("cov", chol.dot(chol.T))
    μ = pm.Normal("μ", 0.0, 10, shape=2, testval=X.mean(axis=0))
    X_latent = pm.MvNormal("X_latent", μ, chol=chol, shape=(len(X),2))
    obs = pm.Normal("obs", X_latent, X_error, observed=X)

What this model says is that there’s a latent matrix X_latent, which contains the correlated features, but rather than observing it directly, you see it with some (known) error added. Let me know if that makes sense.

Best wishes,
Martin

2 Likes

Hello Martin,
Thanks a lot for the solution.

What can I do if I want to include the full covariance matrix per observation?
That is, I have a variable X shaped (100, 2) and the covariance matrix per observation – X_cov with the shape (100, 2, 2).

How can I incorporate X_cov into the model above?
Thanks.

2 Likes

I believe you could get what you want by raveling X into a (200, 1) vector and using a single 200 x 200 covariance matrix with 2x2 covariance matrices on the (block) diagonal.

Actually setting this up in PyMC is slightly complex because aesara doesn’t have a dedicated block_diag function like scipy.linalg.block_diag. I use a scan here, but in my experience scans are awful and should be avoided. I’ve also done it with aesara.sparse.CSR pretty painlessly, but I don’t know where I put that code. No idea which is better, beyond my generally bad experiences with large scans.

import aesara
import aesara.tensor as at

def allocate_block(A, out, r_start, c_start, r_stride, c_stride):
    row_slice = slice(r_start, r_start + r_stride)
    col_slice = slice(c_start, c_start + c_stride)

    next_r = r_start + r_stride
    next_c = c_start + c_stride

    return at.set_subtensor(out[row_slice, col_slice], A), next_r, next_c

def block_diag(arr: at.tensor3):
    n, rows, cols = arr.shape
    out = at.zeros((n * rows, n * cols))
    
    result, _ = aesara.scan(allocate_block,
                            sequences=[arr],
                            outputs_info=[out, at.zeros(1, dtype='int64'), at.zeros(1, dtype='int64')],
                            non_sequences=[rows.astype('int64'), cols.astype('int64')])
    return result[0][-1]

A quick test:

arr = at.tensor3('cov_matrics')
with np.printoptions(linewidth=1000, precision=3, suppress=True):
    print(block_diag(arr).eval({arr:np.random.normal(size=(5,2,2))}))
[[ 1.518 -0.084  0.     0.     0.     0.     0.     0.     0.     0.   ]
 [-0.108 -0.433  0.     0.     0.     0.     0.     0.     0.     0.   ]
 [ 0.     0.    -0.08   1.759  0.     0.     0.     0.     0.     0.   ]
 [ 0.     0.     0.599  2.     0.     0.     0.     0.     0.     0.   ]
 [ 0.     0.     0.     0.    -1.505 -1.356  0.     0.     0.     0.   ]
 [ 0.     0.     0.     0.     0.706 -1.205  0.     0.     0.     0.   ]
 [ 0.     0.     0.     0.     0.     0.    -1.046  0.054  0.     0.   ]
 [ 0.     0.     0.     0.     0.     0.    -1.686 -0.323  0.     0.   ]
 [ 0.     0.     0.     0.     0.     0.     0.     0.     1.776  0.464]
 [ 0.     0.     0.     0.     0.     0.     0.     0.    -0.598  0.487]]

So as you can see, you get this banded matrix where every pair of observations is correlated but independent of the rest. I believe this corresponds to the situation you want. The PyMC code is straightforward as this point: somehow sample 100 covariance matrices, stack them into a (100, 2, 2) tensor, feed it into this block function to get a single (200 x 200) covariance matrix, ravel X so that it’s (200, 1) with the correct structure, then feed it all into an MvNormal.

You could then reshape the output of the MvNormal to be (100, 2) if you so desired.

There might also be a covariance function from the GP library you could use to get something like this with less hassle, but I’m not very well versed in all that. The main idea will be the same though: ravel and use a single structured covariance matrix in MvNormal.

2 Likes

Hi both,

I think Jesse’s solution should work. In fact, if X_cov is known in advance, you shouldn’t have to mess with making the matrix block diagonal in aesara, but it should just work with scipy.linalg.block_diag:

block_diag_cov = block_diag(*covs)

where covs is e.g. a (100, 2, 2) matrix of your known error terms. Then you should be able to run:

with pm.Model() as model:
    chol, corr, stds = pm.LKJCholeskyCov(
        "chol", n=2, eta=2.0, sd_dist=pm.Exponential.dist(1.0), compute_corr=True
    )
    cov = pm.Deterministic("cov", chol.dot(chol.T))
    μ = pm.Normal("μ", 0.0, 10, shape=2, testval=X.mean(axis=0))
    X_latent = pm.MvNormal("X_latent", μ, chol=chol, shape=(len(X),2))
    
    obs = pm.MvNormal(f"obs", X_latent.reshape((-1,)), block_diag_cov, observed=X.reshape(-1))

I would double check that these ravels work as expected (sometimes it can be confusing to flatten one row vs one column at a time), but I think that’s right.

An alternative would be to just loop over your outcomes:

with pm.Model() as model:
    chol, corr, stds = pm.LKJCholeskyCov(
        "chol", n=2, eta=2.0, sd_dist=pm.Exponential.dist(1.0), compute_corr=True
    )
    cov = pm.Deterministic("cov", chol.dot(chol.T))
    μ = pm.Normal("μ", 0.0, 10, shape=2, testval=X.mean(axis=0))
    X_latent = pm.MvNormal("X_latent", μ, chol=chol, shape=(len(X),2))
    
    for i in range(N):
        cur_obs = pm.MvNormal(f"obs_{i}", X_latent[i], Sigmas[i], observed=X)

Where here, Sigmas is your array of covariance matrices.

It’s worth noting that neither of these options will be efficient for large N: the first will make a covariance matrix that is needlessly big, and the second will loop and create lots of variables. There is probably a way to make this much more efficient. Let us know though if this is good enough for your purposes!

3 Likes