# Custom theano Op to do numerical integration

That is a normal cdf, its from pymc3.distributions.dist_math

If my [start,stop] is a given set of arrays, that is, only one parameter a is a variable, then how do I write it?

Dear friend,

Thank you very much for your code. I try a simple example to understand the code better. When I use one parameter everything is OK but when I pass two parameters, it doesn’t work. Following you can see my code. Any help would be greatly appreciated.

from scipy.integrate import quad
import theano
import theano.tensor as tt
import numpy as np

class Integrate(theano.Op):
def __init__(self, expr, var, *extra_vars):
super().__init__()
self._expr = expr
self._var = var
self._extra_vars = extra_vars
self._func = theano.function(
[var] + list(extra_vars),
self._expr,
on_unused_input='ignore')

def make_node(self, start, stop, *extra_vars):
self._extra_vars_node = extra_vars
assert len(self._extra_vars) == len(extra_vars)
self._start = start
self._stop = stop
vars = [start, stop] + list(extra_vars)
#vars = list(extra_vars)
return theano.Apply(self, vars, [tt.dscalar().type()])

def perform(self, node, inputs, out):
start, stop, *args = inputs
val = quad(self._func, start, stop, args=tuple(args))[0]
out[0][0] = np.array(val)

start, stop, *args = inputs
replace = dict(zip(self._extra_vars, args))

replace_ = replace.copy()
replace_[self._var] = start
dstart = out * theano.clone(-self._expr, replace=replace_)

replace_ = replace.copy()
replace_[self._var] = stop
dstop = out * theano.clone(self._expr, replace=replace_)

dargs = []
darg = out * integrate(start, stop, *args)
dargs.append(darg)

return [dstart, dstop] + dargs

y_obs = 8.3

start = theano.shared(1.)
stop = theano.shared(2.)
with pm.Model() as basic_model:
a = pm.Uniform('a', 1.5, 3.5)
b = pm.Uniform('b', 4., 6.)

t = tt.dscalar('t')
t.tag.test_value = np.zeros(())

func = t**a + b
integrate = it.Integrate(func,t,a,b)
mu = integrate(start,stop,a,b)

y = pm.Normal('y', mu=mu, sd=0.4, observed=y_obs)
trace = pm.sample(1500,tune=500, cores=2,chains=2)


There is a small mistake in the gradient part:

...
...


Also, the correct usage should be:

y_obs = 8.3

start = theano.shared(1.)
stop = theano.shared(2.)
with pm.Model() as basic_model:
a = pm.Uniform('a', 1.5, 3.5)
b = pm.Uniform('b', 4., 6.)

# Define the function to integrate in plain theano
t = tt.dscalar('t')
t.tag.test_value = np.zeros(())
a_ = tt.dscalar('a_')
a_.tag.test_value = np.ones(())*2.
b_ = tt.dscalar('b_')
b_.tag.test_value = np.ones(())*5.
func = t**a_ + b_
integrate = Integrate(func, t, a_, b_)

# Now we plug in the values from the model.
# The a_ and b_ from above corresponds to the a and b here.
mu = integrate(start, stop, a, b)
y = pm.Normal('y', mu=mu, sd=0.4, observed=y_obs)
trace = pm.sample(1500, tune=500, cores=2, chains=2)


Thanks for your help. I updated my code according to your point and now the code is correct but I face another problem. When I use NUTS and SMC step methods, the code works but when I use the Metropolis or Hamiltonian step methods, it doesn’t work. In fact, in the case of the Metropolis and Hamiltonian, the code only samples from the prior. It seems strange. Any help? Following I attached my codes.

integration.py (1.6 KB)

test2.py (1.1 KB)

Metropolis is likely not suitable for these kind of problem (as it is inefficient), and to make HMC to work you need to hand tune a lot of parameters.

Perhaps this is a really dumb question …

Would the custom Theano Op Integrate work when either start or stop are infinite? I wonder because it called scipy.intergrate.quad and this function does calculate infinite integrals.

Edit: I get the error TypeError: Cannot convert Type TensorType(float32, scalar) (of Variable a) into Type TensorType(float64, scalar). You can try to manually convert a into a TensorType(float64, scalar). when trying to run this.

Hi

This post has been very useful to me and I was able to actually do the integrals that I need. However, I get an error when actually running a model calculation. It is the same error I get when I run @AH5719 's scripts:

Traceback (most recent call last):
File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/theanof.py", line 80, in floatX
return X.astype(theano.config.floatX)
AttributeError: 'list' object has no attribute 'astype'

During handling of the above exception, another exception occurred:

TypeError: float() argument must be a string or a number, not 'TensorVariable'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "test2.py", line 44, in <module>
y = pm.Normal('y', mu=mu, sd=0.1, observed=y_obs)
File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/distribution.py", line 82, in __new__
dist = cls.dist(*args, **kwargs)
File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/distribution.py", line 91, in dist
dist.__init__(*args, **kwargs)
File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/continuous.py", line 488, in __init__
self.mean = self.median = self.mode = self.mu = mu = tt.as_tensor_variable(floatX(mu))
File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/theanof.py", line 83, in floatX
return np.asarray(X, dtype=theano.config.floatX)
File "/opt/conda/lib/python3.8/site-packages/numpy/core/_asarray.py", line 83, in asarray
return array(a, dtype, copy=False, order=order)
ValueError: setting an array element with a sequence.


So, it seems that pm.Normal does not accept a list of Integrate class objects. I’m running the code with python 3.8.5, pymc3 3.9.2 and theano 1.0.5. Does anybody have an idea how to solve this? Thanks a lot!

I found a work around: just cast the list of integrals into a tensor variable, i.e. modify the following line in the test2.py file in the previous post to

y = pm.Normal('y', mu=tt.as_tensor_variable(mu), sd=0.1, observed=y_obs)

Now, everything runs well.

2 Likes

Thank you for all the help, this was very useful. I am trying to generalize the function to the case where the further parameters can be vectors or matrices, e.g. for the following case:

y_obs = 8.3
start = aesara.shared(1.)
stop = aesara.shared(2.)

with pm.Model() as basic_model:

a = pm.Uniform('a', 1.5, 3.5)
b = pm.Uniform(
'b',
4., 6.,
shape=(3)
)

# Define the function to integrate in plain pytensor
t = at.dscalar('t')
t.tag.test_value = np.zeros(())
a_ = at.dscalar('a_')
a_.tag.test_value = np.ones(())*2.

b_ = at.dvector('b_')
b_.tag.test_value = np.ones((3))*5.

func = t**a_ + b_.sum()
integrate = Integrate(
# Function we're integrating
func,
# variable we're integrating
t,
# other variables
a_, b_
)

# Now we plug in the values from the model.
# The a_ and b_ from above corresponds
# to the a and b here.
mu = integrate(
start,
stop,
a,
b
)
y = pm.Normal(
'y',
mu=mu,
sigma=0.4,
observed=y_obs
)

trace = pm.sample(
1500,
tune=500,
cores=2,
chains=2,
return_inferencedata=True
)


When I run the above with the Integrate Op as defined above in this thread, I get the following error:

ValueError: Integrate.grad returned a term with 0 dimensions, but 1 are required.

I assume we should specify somewhere that the gradient of b is 1-d now, but where should that happen? Thanks!

Hi folks,

I adapted the function to work with (either scalar or vector valued) functions f: R^{d_1}_1 \times ... \times R^{d_n}_n \rightarrow R^k, such that the argument wrt which we’re integrating is a scalar and the other arguments are either scalars or vectors.

import numpy as np
import pymc as pm
import pytensor as aesara
import pytensor.tensor as at

# Needed for integration
from pytensor.graph.op import Op
from pytensor.graph.basic import Apply
from pytensor import clone_replace

class Integrate(Op):

# Class to perform integration of a scalar variable
# on a bounded interval

# https://discourse.pymc.io/t/custom-theano-op-to-do-numerical-integration/734/12
# With some modifications!

def __init__(self, expr, var, *extra_vars):
"""
Parameters
----------
expr: Aesara Variable
The expression encoding the output
var: Aesara Variable
The input variable
"""
super().__init__()

# function we're integrating
self._expr = expr

# input var we're integrating over
self._var = var

# other variables
self._extra_vars = extra_vars

# transform expression into callable function
self._func = aesara.function(
# a list with all the inputs
[var] + list(extra_vars),
# output
self._expr,
on_unused_input='ignore'
)

def make_node(self, start, stop, *extra_vars):

self._extra_vars_node = extra_vars

# make sure that the same number of extra variables
# are passed here as were specified when defining the Op
assert len(self._extra_vars) == len(extra_vars)

# Define the bounds of integration
self._start = start
self._stop = stop

# return an Apply instance with the input and output Variable
return Apply(
# op: The operation that produces outputs given inputs.
op=self,
# inputs: The arguments of the expression modeled by the Apply node.
inputs=[start, stop] + list(extra_vars),
# outputs: The outputs of the expression modeled by the Apply node.
# NOTE: This is a scalar if self._expr is a scalar,
# and a vector if self._expr is a vector. Etc.
outputs=[self._expr.type()]
)

def perform(self, node, inputs, out):
"""
Out is the output storage.
Inputs are passed by value.
A single output is returned indirectly
as the first element of single-element lists (out)

NOTE: There's a restriction, namely the variable to integrate
has to be a scalar, even though the other variables can vector.

Parameters
----------
node: Apply node
The output of make_node
inputs: List of data
The data can be operated on with numpy
out: List
output_storage is a list of storage cells where the output
is to be stored. There is one storage cell for each output of the Op.
The data put in output_storage must match the type of the symbolic output.
"""

# Runs the computation in python
start, stop, *args = inputs

if self._expr.ndim == 0:
self._func,
start,
stop,
args=tuple(args)
)[0]
elif self._expr.ndim == 1:
# if the function is vector-valued
# (e.g., the gradient of a vector),
self._func,
start,
stop,
args=tuple(args)
)[0]
else:
shape = self._func(
start,
*args
).shape

def helpfunc(*args):
return self._func(*args).flatten()

helpfunc,
start,
stop,
args=tuple(args)
)[0].reshape(shape)

# in-place modification of "out".
# out is a single-element list
out[0][0] = np.array(val)

"""
NOTE: This function does not calculate the gradient
but rather implements part of the chain rule,
for an explanation

Inputs in this case contains:
[lower integration bound, upper integration bound, ...other variables of function]
"""

# unpack the input
start, stop, *args = inputs

# dictionary with the extra variables as keys
# and the extra variables in "inputs" as values
replace = dict(zip(
self._extra_vars,
args
))

# The derivative of integral wrt to the upper limit of integration
# is just the value of the function at that limit
# (for lower limit, it's minus the function)
# See e.g.,
# https://math.stackexchange.com/questions/984111/differentiating-with-respect-to-the-limit-of-integration
replace_ = replace.copy()
replace_[self._var] = start
dstart = out * clone_replace(
# Clone a graph and replace subgraphs within it.
# It returns a copy of the initial subgraph with the corresponding
# substitutions.
-self._expr,
# Dictionary describing which subgraphs should be replaced by what.
replace=replace_
)

replace_ = replace.copy()
replace_[self._var] = stop
dstop = out * clone_replace(
self._expr,
replace=replace_
)

# calculate the symbolic gradient of self._expr
# wrt each extra variable.
# This can be done because they're symbolic aesara variables!
# This corresponds to the gradient of the expression
# *inside* the integral (the inner part of Leibniz'
# integral rule)
# cost
self._expr,
# wrt
self._extra_vars
)

dargs = []
# loop over the gradients of the extra vars

# define an Apply node
integrate = Integrate(
# variable we're integrating over
self._var,
*self._extra_vars
)

# Apply Leibniz' integral rule:
# call integrate, which evaluates
# the integral of the gradient.
# And then multiply with previous gradient
# that was passed in the input.
# NOTE: This is not actually doing the operation,
# but rather calling make_node, which *creates the node*
# that does the operation
darg = at.dot(
integrate(
start, stop,
*args
).T,
out
)

dargs.append(darg)

# return a list with one Variable for each input in inputs
return [dstart, dstop] + dargs


I changed a few things:

• Apply in the return value of make_node now gets outputs=[self._expr.type()], since the expression can be any shape.
• perform adapts depending on the shape of the output to self._func. If the output is a scalar, it uses quad. If it is a vector, it uses quad_vec. Otherwise, it defines a helper function that flattens the output of self._func, then runs quad, and finally reshapes it back into the original shape.
• The previous point is useful because in grad() we integrate, with respect to self._var, the Jacobian of self_expr with respect to each of the extra variables. Since some of the extra variables can be vectors and self_var can be a vector, the Jacobian can be up to 2 dimensional.

I tested the code with a modified version of the example above, where one of the inputs (b) is now a vector, and the function to integrate, i.e., f(a, b, t) = t^a + b, is vector valued. We are calculating \int_1^2 f(a, b, t) dt:

### Example of usage

y_obs = np.array([8.3, 8.0, 7.8])
start = aesara.shared(1.)
stop = aesara.shared(2.)

with pm.Model() as basic_model:

a = pm.Uniform('a', 1.5, 3.5)
b = pm.Uniform(
'b',
4., 6.,
shape=(3)
)

# Define the function to integrate in plain pytensor
t = at.dscalar('t')
t.tag.test_value = np.zeros(())

a_ = at.dscalar('a_')
a_.tag.test_value = np.ones(())*2.

b_ = at.dvector('b_')
b_.tag.test_value = np.ones((3))*5.

func = t**a_ + b_
integrate = Integrate(
# Function we're integrating
func,
# variable we're integrating
t,
# other variables
a_, b_
)

# Now we plug in the values from the model.
# The a_ and b_ from above corresponds
# to the a and b here.
mu = integrate(
start,
stop,
a,
b
)
y = pm.Normal(
'y',
mu=mu,
sigma=0.4,
observed=y_obs
)

with basic_model:
trace = pm.sample(
1500,
tune=500,
cores=2,
chains=2,
return_inferencedata=True
)


Any feedback would be greatly appreciated - there might be some mistakes. I have also used this for a more complex model and it’s extremely slow, so it would be awesome to find ways of making it faster. Thanks!

Looks correct. You can set a flag self._func.trust_input=True after you first define _func. That will skip checking the type of the inputs which can cause quise some overhead when calling the function.

If you are interested we could add this as a utility directly in PyTensor (our current backend/fork of Aesara).

We would probably tweak it slightly in that case. Usually we use FunctionGraph to represent inner graphs and only compile as a function later on. There is an example of a new Loop Op that we are building you can have a look at: Implement new Loop and Scan operators by ricardoV94 · Pull Request #191 · pymc-devs/pytensor · GitHub (specifically look at pytensor/loop/op.py::Loop)

Thank you for the help! I tried the advice re trust_input.

Adding self._func.trust_input=True in __init__ right after the definition of self._func makes the kernel die when running the example in a jupyter notebook. When running the code from the command line, it raises a Segmentation fault.

Any additional ideas on how to make it faster would also be great!

That means some of the inputs are not of the right type

I am struggling to see what the problem could be - a, b, and t all seem to be the right type to me in the example above. I tried with scalar b and it still raises the Segmentation error. Any thoughts / hints?

Can you print their types (and dtypes) in the perform method (before you pass them to _func)? That should be pretty clear. Maybe you have python literals you need to convert to numpy arrays.

print(inputs)
print([type(a) for a in inputs])
print([a.dtype for a in inputs])


to perform, and when running the example above the output is:

Auto-assigning NUTS sampler...
[array(1.), array(2.), array(2.08049756), array([5.46187651, 5.20988144, 4.94399615])]
[<class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'numpy.ndarray'>]
[dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64')]
Segmentation fault


The inputs are of the expected shapes and correspond to start, stop, a, and b.

Unless I am missing something, this looks as it should, so I guess the problem must be somewhere else (?).

What are the node input types? If setting trust input to false is causing a segfault it must be that the function was converting them to other types (maybe float32?)

Everything seems to be right:

Adding this to __init__:

print("Input types: ", [i.dtype for i in [var] + list(extra_vars)])
print("Output type: ", self._expr.type, self._expr.dtype)


prints out:

Input types:  ['float64', 'float64', 'float64']
Output type:  TensorType(float64, (?,)) float64


I also added this to make_node:

print("Make node input types: ", [
(i.type, i.dtype)
for i in
[self._start, self._stop] + list(self._extra_vars_node)
])
print(
"Make node output type: ",
self._expr.type()
)


which prints:

Make node input types:  [(TensorType(float64, ()), 'float64'), (TensorType(float64, ()), 'float64'), (TensorType(float64, ()), 'float64'), (TensorType(float64, (3,)), 'float64')]
Make node output type:  <TensorType(float64, (?,))>


I thought the problem could be that it doesn’t know the exact output shape, but passing it directly doesn’t seem to solve the problem either.

Hmm, by this point I would have to go through the self._func.__call__ and see what change is happening when trust_input is True. Anyway, that’s likely a rabbit hole not worth pursuing