Custom Covariance Function for Gaussian Process

Thank you all so much for this amazing package!
I am using this library for Gaussian Process modeling. I am particularly interested in implementing an event-trigger covariance function. Here is a visualization of the covariance matrix (the second matrix).


Here, the x and y axis denotes the time series. The idea here being that there are some events that trigger increase of variance in the subsequence time points. Does anyone know what kind of kernel I should use or how do I define a custom kernel? Due to the discontinuity at 0, I am not sure if I can get away without some conditional statement, which will not be differentiable?

Greatly appreciated if you know how to implement such a kernel or if you have some ideas on where I can start!

1 Like

Welcome!

Perhaps @bwengals or @fonnesbeck might chime in on this?

Thank you for your replies!
I implemented a custom kernel here but the sampling is super inefficient and compute time blows up with sample size. The idea is that the X consists of two columns, time steps and rewarded_array. The later indicates getting a reward, and triggers the reward-sensitive kernel to makes subsequent time-step outcome more similar to current outcome that gets rewarded.
My code is a brute force way of implementing this… I am guessing the bottleneck is inverting large matrix. Any ideas on how to optimize this for efficient sampling?

import pymc as pm
import numpy as np
import  pytensor.tensor as pt

# define the reward sensitive kernel
class RS_KERNEL(pm.gp.cov.Covariance):
    def __init__(self, sigma, l_rs, input_dim, active_dims=None):
        super(RS_KERNEL, self).__init__(input_dim, active_dims)
        self.sigma = sigma
        self.l_rs = l_rs

    def square_exp_kernel(self, r, sigma, l):
        """
        Compute the squared exponential kernel.
        """
        return sigma**2 * pt.exp(-r**2 / (2 * l**2))

    def diag(self, X):
        return pt.ones(X.shape[0])

    def full(self, X, Xs=None):
        if Xs is None:
            Xs = X

        # Compute squared exponential using the first column (T)
        r = pt.abs(X[:, 0][:, None] - Xs[:, 0])
        K_se = self.square_exp_kernel(r, self.sigma, self.l_rs)

        # Generate mask using the second column (rewarded_array)
        mask = pt.zeros_like(K_se)
        for i in range(X.shape[0]):
            if X[i, 1] == 1:
                mask = pt.set_subtensor(mask[i, :], 1)
                mask = pt.set_subtensor(mask[:, i], 1)

        return K_se * mask

# Example data with two columns
T = 1000
data = np.column_stack((np.arange(T), rewarded_trials))
# Now that we have the simulation process, we want to use the data to infer the parameters of the model.
import  pytensor.tensor as pt
class RS_KERNEL(pm.gp.cov.Covariance):
    def __init__(self, sigma, l_rs, input_dim, active_dims=None):
        super(RS_KERNEL, self).__init__(input_dim, active_dims)
        self.sigma = sigma
        self.l_rs = l_rs

    def square_exp_kernel(self, r, sigma, l):
        """
        Compute the squared exponential kernel.
        """
        return sigma**2 * pt.exp(-r**2 / (2 * l**2))

    def diag(self, X):
        return pt.ones(X.shape[0])

    def full(self, X, Xs=None):
        if Xs is None:
            Xs = X

        # Compute squared exponential using the first column (T)
        r = pt.abs(X[:, 0][:, None] - Xs[:, 0])
        K_se = self.square_exp_kernel(r, self.sigma, self.l_rs)

        # Generate mask using the second column (rewarded_array)
        mask = pt.zeros_like(K_se)
        for i in range(X.shape[0]):
            if X[i, 1] == 1:
                mask = pt.set_subtensor(mask[i, :], 1)
                mask = pt.set_subtensor(mask[:, i], 1)

        return K_se * mask
    


with pm.Model() as model:
    # Priors
    sig_g = pm.HalfCauchy('sig_g', beta=2.5)
    ALPHA = pm.Beta('ALPHA', alpha=1, beta=1)
    l_se = pm.Exponential('l_se', 1/20)
    l_rs = pm.Exponential('l_rs', 1/5)
    sigma_private = pm.HalfCauchy('sigma_private', beta=2.5)
    
    # Covariance functions
    cov_se = sig_g**2 * pm.gp.cov.ExpQuad(2, l_se, active_dims=[0])
    cov_rs = RS_KERNEL(sig_g, l_rs, 2, active_dims=[0,1])
    cov_private_noise = pm.gp.cov.WhiteNoise(sigma_private)
    
    # Combined covariance
    K = ALPHA * cov_se + (1 - ALPHA) * cov_rs + cov_private_noise

    # Gaussian Process
    gp = pm.gp.Marginal(cov_func=K)
    y_ = gp.marginal_likelihood('y', X=data, y=Y_result_final, noise=sigma_private)
    
    # Sampling
    trace = pm.sample(1000, chains=2, tune=1000, target_accept=0.9)
    # Generate mask using the second column (rewarded_array)
    mask = pt.zeros_like(K_se)
    for i in range(X.shape[0]):
        if X[i, 1] == 1:
            mask = pt.set_subtensor(mask[i, :], 1)
            mask = pt.set_subtensor(mask[:, i], 1)

    return K_se * mask

This bit looks ripe for optimization. Generally, for loop are problematic in PyMC because they cannot be compiled with pytensor. So you’ll get most of your code executing at machine speed and then a chunk executing at python speed. We might be able to use something like:

bool_vector = pt.eq(X[:,1],1)
mask = pt.switch(bool_vector, [IF RESULT], [ELSE RESULT])

Could you say a bit about what you are trying to accomplish with:

        if X[i, 1] == 1:
            mask = pt.set_subtensor(mask[i, :], 1)
            mask = pt.set_subtensor(mask[:, i], 1)

It’s not jumping out to me why you might immediately write over the first mask with the second.

4 Likes

Thanks for the response.
The idea is to generate a mask based on the reward_trials. If the reward_trials == 1 at i, we want to set the entries after and below the diagonal (i,i) to be 1. Applying this mask to the original kernel will produce the second kernel matrix shown in the figure.

Hi Bowen, have you been able to test your covariance kernel on its own? I tried to test the full method with:

# sample data with time steps as T and random bernouli events as X
X = stats.binom(n=1,p=0.1).rvs(size=10)
Y = np.arange(0,10)
X = np.stack((Y,X))
X = X.T

# Priors
sig_g = pm.HalfCauchy.dist(beta=2.5)
l_rs = pm.Exponential.dist(1/5)

# Covariance functions
cov_rs = RS_KERNEL(sig_g, l_rs, 2, active_dims=[0,1])
pm.draw(cov_rs.full(X))

and ran into an error compiling the graph (hard to interpret the message). Similarly, I tried to just sample from the prior of the whole model and ran into a similar compilation error. These often arise from incompatible shapes or invalid operations.

with pm.Model() as model:
    # Priors
    sig_g = pm.HalfCauchy('sig_g', beta=2.5)
    ALPHA = pm.Beta('ALPHA', alpha=1, beta=1)
    l_se = pm.Exponential('l_se', 1/20)
    l_rs = pm.Exponential('l_rs', 1/5)
    sigma_private = pm.HalfCauchy('sigma_private', beta=2.5)
    
    # Covariance functions
    cov_se = sig_g**2 * pm.gp.cov.ExpQuad(2, l_se, active_dims=[0])
    cov_rs = RS_KERNEL(sig_g, l_rs, 2, active_dims=[0,1])
    cov_private_noise = pm.gp.cov.WhiteNoise(sigma_private)
    
    # Combined covariance
    K = ALPHA * cov_se + (1 - ALPHA) * cov_rs + cov_private_noise

    # Gaussian Process
    gp = pm.gp.Latent(cov_func=K)
    y_ = gp.prior('y', X=X)
    
    # Sampling
    # trace = pm.sample(1000, chains=2, tune=1000, target_accept=0.9)
    
    pm.draw(y_)

My goal was to try to figure out whether the for loop with the masks is behaving the way it is supposed to. But no luck until I reproduce the kernel locally.