Dear Pymc3 experts,
I am trying to use a cognitive model called RDM(racing diffusion model) in pymc3 and I confront some problems. Racing diffusion model is quite similar to the linear ballistic accumulator model, assuming the number of evidence accumulators is identical to the option number. The biggest difference between RDM and LBA is that RDM allows inner-trial noise. For RDM, each accumulator PDF is a shift wald distribution. The mathematical details of racing diffusion model could be found here. Thomas and Folter build a wonderful python package called glambox which includes the racing diffusion model. I copied their code and try to fit some faked data with the model, however I confront the SamplingError. Could you help me fix this problem? Thanks a lot.
import theano.tensor as tt
import numpy as np
import pymc3 as pm
def tt_normal_cdf(x, mu=0, sd=1):
"""
Normal cumulative distribution function
Theano tensor implementation
"""
return (0.5 + 0.5 * tt.erf((x - mu) / (sd * tt.sqrt(2.))))
def tt_wienerpos_fpt_pdf(t, drift, noise, boundary):
"""
Probability density function of first passage times of
Wiener process with positive drift towards constant boundary.
Theano tensor implementation
Cf https://en.wikipedia.org/wiki/Inverse_Gaussian_distribution#Relationship_with_Brownian_motion
"""
mu = boundary / drift
lam = (boundary**2 / noise**2)
return ((lam[:,None] / (2 * np.pi * t**3))**0.5 * tt.exp(
(-lam[:,None] * (t - mu[:,None])**2) / (2 * mu[:,None]**2 * t)))
def tt_wienerpos_fpt_cdf(t, drift, noise, boundary, numerical_stability=100):
"""
Cumulative distribution function of first passage times of
Wiener process with positive drift towards constant boundary.
Theano tensor implementation
Cf https://en.wikipedia.org/wiki/Inverse_Gaussian_distribution#Relationship_with_Brownian_motion
"""
mu = boundary / drift
lam = (boundary / noise)**2
bounded_ratio = tt.where(lam / mu >= numerical_stability,
numerical_stability, lam / mu)
return (tt_normal_cdf(tt.sqrt(lam[:,None] / t) * (t / mu[:,None] - 1)) +
tt.exp(2 * bounded_ratio[:,None]) * tt_normal_cdf(-(tt.sqrt(lam[:,None] / t) * (t / mu[:,None] + 1))))
def tt_wienerrace_pdf(t, drift, noise, boundary,starting, t0, zerotol=1e-14):
"""
Probability density function of first passage times of
a race between multiple Wiener processes with positive drift
towards a constant boundary.
Theano tensor implementation
"""
# Nondecision time T0
t = t - t0
t = tt.where(t <= 0, 0, t)
# boundary parameter
k = boundary * (1-starting)
# first passage time densities, single Wiener accumulators
f = tt_wienerpos_fpt_pdf(t, drift, noise, k)
# first passage time distributions, single Wiener accumulators
F = tt_wienerpos_fpt_cdf(t, drift, noise, k)
# survival functions
S = 1. - F
# race densities
# Note: drifts should be sorted so that chosen item drift is in first column
l = f[:, 0] * tt.prod(S[:, 1:], axis=1)
return l
def contaminant_lf(upper,lower):
"""
:param upper: upper bound of RT
:param lower: lower bound of RT
"""
return 1/(upper-lower)
def tt_wienerrace_con_pdf(t, drift, noise, boundary,starting, t0,upper,lower,p):
"""
:param p: proportion of contaminant data
:return: likelihood function of racing diffusion model with contaminant data
"""
return (1-p)*tt_wienerrace_pdf(t, drift, noise, boundary,starting, t0) + p*contaminant_lf(upper,lower)
def tt_wienerrace_llf(x, drift, noise, boundary,starting, t0,upper,lower,p):
return tt.sum(tt.log(tt_wienerrace_con_pdf(x, drift, noise, boundary,starting, t0,upper,lower,p)))
def wienerrace_rng(v,z,A,ndt,noise,upper,lower,p,size):
"""
racing diffusion model random generator
v: vector for drift rates
z: vecctor for starting bias
A: boundary
ndt: non-decision time
noise: scaling parameter for wiener process
upper: upper bound of RT
lower: lower bound of RT
p: contaminant proportion
size: simulation number
"""
option_num = len(v)
v = np.array(v)
z = np.array(z)
k = A*(1 - z)
mu = k / v
lam = (k**2 / noise**2)
rt_raw = np.zeros([option_num,size])
choice = np.zeros([size])
for n in range(option_num):
rt_raw[n,:] = np.random.wald(mu[n],lam[n],size)
for i in range(size):
choice[i] = np.where(rt_raw[:,i]==np.min(rt_raw[:,i]))[0][0]
rt_raw = rt_raw.T + ndt
rt = rt_raw[np.arange(size),choice.astype('int32')]
## adding contanminent
p_sample = np.random.uniform(0,1,size)
cri = p_sample<p
rt[cri] = np.random.uniform(lower,upper,len(cri[cri==True]))
choice[cri] = np.random.binomial(size=len(cri[cri==True]),n=option_num-1,p=1/option_num)
return rt,choice
class rdm_wfpm(pm.Continuous):
def __init__(self,drift, noise, boundary,starting, t0,upper,lower,p,**kwargs):
self.drift = tt.as_tensor_variable(pm.floatX(drift))
self.noise = tt.as_tensor_variable(pm.floatX(noise))
self.boundary = tt.as_tensor_variable(pm.floatX(boundary))
self.starting = tt.as_tensor_variable(pm.floatX(starting))
self.t0 = tt.as_tensor_variable(pm.floatX(t0))
self.upper = tt.as_tensor_variable(pm.floatX(upper))
self.lower = tt.as_tensor_variable(pm.floatX(lower))
self.p = tt.as_tensor_variable(pm.floatX(p))
super().__init__(**kwargs)
def logp(self,value):
return tt_wienerrace_llf(value,
drift=self.drift,
noise=self.noise,
boundary=self.boundary,
starting=self.starting,
t0=self.t0,
upper=self.upper,
lower=self.lower,
p=self.p)
The code above is the likelihood function of racing diffusion model.
#simulate data
rt,choice = wienerrace_rng(v=[1,2],z=[0,0],A=3,ndt=0.4,noise=1,upper=5,lower=0.1,p=0.05,size=200)
data =np.array([rt,choice])
#classify the simulated data
right_data = data[:,data[1,:]==1]
left_data = data[:,data[1,:]==0]
#build pymc3 model
with pm.Model() as rdm:
v1 = pm.Normal('drift_1',mu=0,sigma=1,shape=1,testval = 0)
z1 = pm.Normal('bias_1',0,1,shape=1,testval = 0)
v2 = pm.Deterministic('v2',pm.math.exp(pm.Normal('drift_2',mu=0,sigma=1,shape=1,testval = 0)))
z2 = pm.Normal('bias_2',mu=0,sigma=1,shape=1)
A = pm.Deterministic('boundary_',pm.math.exp(pm.Normal('boundary',mu=0,sigma=1,shape=1,testval = 0)))
z1_ = pm.Deterministic('bias_1_',pm.math.log(z1))
z2_ = pm.Deterministic('bias_2_',pm.math.log(z2))
ndt = pm.Normal('ndt',0,1,shape=1,testval=-5)
ndt_ = pm.Deterministic('ndt_',pm.math.exp(ndt))
p = pm.Normal('p',0,shape=1)
p_tran = pm.Deterministic('p_',pm.math.log(p))
left_drift_vector = pm.Deterministic('left_drift',pm.math.concatenate((v1,v2)))
right_drift_vector = pm.Deterministic('right_drift',pm.math.concatenate((v2,v1)))
left_z = pm.Deterministic('left_z',pm.math.concatenate((z1_,z2_)))
right_z = pm.Deterministic('right_z',pm.math.concatenate((z2_,z1_)))
y_left = rdm_wfpm(name='x_left',drift=left_drift_vector, noise=1, boundary=A,starting=left_z, t0=ndt_,upper=5,lower=0.1,p=p_tran,observed=left_data)
y_right = rdm_wfpm(name='y_left',drift=right_drift_vector, noise=1, boundary=A,starting=right_z, t0=ndt_,upper=5,lower=0.1,p=p_tran,observed=right_data)
#sampling
with rdm:
trace = pm.sample()
And here is the error:
SamplingError: Initial evaluation of model at starting point failed!
Starting values:
{'drift_1': array([0.]), 'bias_1': array([0.]), 'drift_2': array([0.]), 'bias_2': array([0.]), 'boundary': array([0.]), 'ndt': array([-5.])}
Initial evaluation results:
drift_1 -0.92
bias_1 -0.92
drift_2 -0.92
bias_2 -0.92
boundary -0.92
ndt -13.42
x_left NaN
y_left NaN
Name: Log-probability of test_point, dtype: float64