Here is the code for LogLikeWithGrad_full. I adapted it directly from the backbox likelihood function example in pyMC documentation. The difference is that here I need to fit the likelihood function on not one subject’s data but 94.
import theano.tensor as tt
import numpy as np
import warnings
def gradients(vals, func, releps=1e-3, abseps=None, mineps=1e-9, reltol=1e-3,
epsscale=0.5):
"""
Calculate the partial derivatives of a function at a set of values. The
derivatives are calculated using the central difference, using an iterative
method to check that the values converge as step size decreases.
Parameters
----------
vals: array_like
A set of values, that are passed to a function, at which to calculate
the gradient of that function
func:
A function that takes in an array of values.
releps: float, array_like, 1e-3
The initial relative step size for calculating the derivative.
abseps: float, array_like, None
The initial absolute step size for calculating the derivative.
This overrides `releps` if set.
`releps` is set then that is used.
mineps: float, 1e-9
The minimum relative step size at which to stop iterations if no
convergence is achieved.
epsscale: float, 0.5
The factor by which releps if scaled in each iteration.
Returns
-------
grads: array_like
An array of gradients for each non-fixed value.
"""
grads = np.zeros(len(vals))
# maximum number of times the gradient can change sign
flipflopmax = 10.
# set steps
if abseps is None:
if isinstance(releps, float):
eps = np.abs(vals)*releps
eps[eps == 0.] = releps # if any values are zero set eps to releps
teps = releps*np.ones(len(vals))
elif isinstance(releps, (list, np.ndarray)):
if len(releps) != len(vals):
raise ValueError("Problem with input relative step sizes")
eps = np.multiply(np.abs(vals), releps)
eps[eps == 0.] = np.array(releps)[eps == 0.]
teps = releps
else:
raise RuntimeError("Relative step sizes are not a recognised type!")
else:
if isinstance(abseps, float):
eps = abseps*np.ones(len(vals))
elif isinstance(abseps, (list, np.ndarray)):
if len(abseps) != len(vals):
raise ValueError("Problem with input absolute step sizes")
eps = np.array(abseps)
else:
raise RuntimeError("Absolute step sizes are not a recognised type!")
teps = eps
# for each value in vals calculate the gradient
count = 0
for i in range(len(vals)):
# initial parameter diffs
leps = eps[i]
cureps = teps[i]
flipflop = 0
# get central finite difference
fvals = np.copy(vals)
bvals = np.copy(vals)
# central difference
fvals[i] += 0.5*leps # change forwards distance to half eps
bvals[i] -= 0.5*leps # change backwards distance to half eps
cdiff = (func(fvals)-func(bvals))/leps
while 1:
fvals[i] -= 0.5*leps # remove old step
bvals[i] += 0.5*leps
# change the difference by a factor of two
cureps *= epsscale
if cureps < mineps or flipflop > flipflopmax:
# if no convergence set flat derivative (TODO: check if there is a better thing to do instead)
warnings.warn("Derivative calculation did not converge: setting flat derivative.")
grads[count] = 0.
break
leps *= epsscale
# central difference
fvals[i] += 0.5*leps # change forwards distance to half eps
bvals[i] -= 0.5*leps # change backwards distance to half eps
cdiffnew = (func(fvals)-func(bvals))/leps
if cdiffnew == cdiff:
grads[count] = cdiff
break
# check whether previous diff and current diff are the same within reltol
rat = (cdiff/cdiffnew)
if np.isfinite(rat) and rat > 0.:
# gradient has not changed sign
if np.abs(1.-rat) < reltol:
grads[count] = cdiffnew
break
else:
cdiff = cdiffnew
continue
else:
cdiff = cdiffnew
flipflop += 1
continue
count += 1
return grads
class LogLikeGrad_full(tt.Op):
"""
This Op will be called with a vector of values and also return a vector of
values - the gradients in each dimension.
"""
itypes = [tt.dmatrix]
otypes = [tt.dmatrix]
def __init__(self, loglike, data):
"""
Initialise with various things that the function requires. Below
are the things that are needed in this particular example.
Parameters
----------
loglike:
The log-likelihood (or whatever) function we've defined
data:
The "observed" data that our log-likelihood function takes in
x:
The dependent variable (aka 'x') that our model requires
sigma:
The noise standard deviation that out function requires.
"""
# add inputs as class attributes
self.likelihood = loglike
self.data = data
def perform(self, node, inputs, outputs):
(param,) = inputs
grads = np.empty(param.shape)
for i in range(len(self.data)):
# define version of likelihood function to pass to derivative function
def lnlike(values):
return self.likelihood(values, self.data[i])
# calculate gradients
grads[i] = gradients(param[i], lnlike)
outputs[0][0] = grads.T
class LogLikeWithGrad_full(tt.Op):
"""
Specify what type of object will be passed and returned to the Op when it is
called. In our case we will be passing it a vector of values (the parameters
that define our model) and returning a single "scalar" value (the
log-likelihood)
"""
itypes = [tt.dmatrix] # expects a vector of parameter values when called
otypes = [tt.dvector] # outputs a single scalar value (the log likelihood)
def __init__(self, loglike, data):
"""
Initialise the Op with various things that our log-likelihood function
requires. Below are the things that are needed in this particular
example.
Parameters
----------
loglike:
The log-likelihood (or whatever) function we've defined
data:
The "observed" data that our log-likelihood function takes in
x:
The dependent variable (aka 'x') that our model requires
sigma:
The noise standard deviation that our function requires.
"""
# add inputs as class attributes
self.likelihood = loglike
self.data = data
# initialise the gradient Op (below)
self.logpgrad = LogLikeGrad_full(self.likelihood, self.data)
def perform(self, node, inputs, outputs):
# the method that is used when calling the Op
(param,) = inputs # this will contain my variables
# call the log-likelihood function
logl = [self.likelihood(param.T[i], self.data[i]) for i in range(len(param.T))]
outputs[0][0] = np.array(logl) # output the log-likelihood
def grad(self, inputs, g):
# the method that calculates the gradients - it actually returns the
# vector-Jacobian product - g[0] is a vector of parameter values
(param,) = inputs # our parameters
return [g[0] * self.logpgrad(param.T)]