Thank you for your replies!
I implemented a custom kernel here but the sampling is super inefficient and compute time blows up with sample size. The idea is that the X consists of two columns, time steps and rewarded_array. The later indicates getting a reward, and triggers the reward-sensitive kernel to makes subsequent time-step outcome more similar to current outcome that gets rewarded.
My code is a brute force way of implementing this… I am guessing the bottleneck is inverting large matrix. Any ideas on how to optimize this for efficient sampling?
import pymc as pm
import numpy as np
import pytensor.tensor as pt
# define the reward sensitive kernel
class RS_KERNEL(pm.gp.cov.Covariance):
def __init__(self, sigma, l_rs, input_dim, active_dims=None):
super(RS_KERNEL, self).__init__(input_dim, active_dims)
self.sigma = sigma
self.l_rs = l_rs
def square_exp_kernel(self, r, sigma, l):
"""
Compute the squared exponential kernel.
"""
return sigma**2 * pt.exp(-r**2 / (2 * l**2))
def diag(self, X):
return pt.ones(X.shape[0])
def full(self, X, Xs=None):
if Xs is None:
Xs = X
# Compute squared exponential using the first column (T)
r = pt.abs(X[:, 0][:, None] - Xs[:, 0])
K_se = self.square_exp_kernel(r, self.sigma, self.l_rs)
# Generate mask using the second column (rewarded_array)
mask = pt.zeros_like(K_se)
for i in range(X.shape[0]):
if X[i, 1] == 1:
mask = pt.set_subtensor(mask[i, :], 1)
mask = pt.set_subtensor(mask[:, i], 1)
return K_se * mask
# Example data with two columns
T = 1000
data = np.column_stack((np.arange(T), rewarded_trials))
# Now that we have the simulation process, we want to use the data to infer the parameters of the model.
import pytensor.tensor as pt
class RS_KERNEL(pm.gp.cov.Covariance):
def __init__(self, sigma, l_rs, input_dim, active_dims=None):
super(RS_KERNEL, self).__init__(input_dim, active_dims)
self.sigma = sigma
self.l_rs = l_rs
def square_exp_kernel(self, r, sigma, l):
"""
Compute the squared exponential kernel.
"""
return sigma**2 * pt.exp(-r**2 / (2 * l**2))
def diag(self, X):
return pt.ones(X.shape[0])
def full(self, X, Xs=None):
if Xs is None:
Xs = X
# Compute squared exponential using the first column (T)
r = pt.abs(X[:, 0][:, None] - Xs[:, 0])
K_se = self.square_exp_kernel(r, self.sigma, self.l_rs)
# Generate mask using the second column (rewarded_array)
mask = pt.zeros_like(K_se)
for i in range(X.shape[0]):
if X[i, 1] == 1:
mask = pt.set_subtensor(mask[i, :], 1)
mask = pt.set_subtensor(mask[:, i], 1)
return K_se * mask
with pm.Model() as model:
# Priors
sig_g = pm.HalfCauchy('sig_g', beta=2.5)
ALPHA = pm.Beta('ALPHA', alpha=1, beta=1)
l_se = pm.Exponential('l_se', 1/20)
l_rs = pm.Exponential('l_rs', 1/5)
sigma_private = pm.HalfCauchy('sigma_private', beta=2.5)
# Covariance functions
cov_se = sig_g**2 * pm.gp.cov.ExpQuad(2, l_se, active_dims=[0])
cov_rs = RS_KERNEL(sig_g, l_rs, 2, active_dims=[0,1])
cov_private_noise = pm.gp.cov.WhiteNoise(sigma_private)
# Combined covariance
K = ALPHA * cov_se + (1 - ALPHA) * cov_rs + cov_private_noise
# Gaussian Process
gp = pm.gp.Marginal(cov_func=K)
y_ = gp.marginal_likelihood('y', X=data, y=Y_result_final, noise=sigma_private)
# Sampling
trace = pm.sample(1000, chains=2, tune=1000, target_accept=0.9)