Custom theano Op to do numerical integration

That is a normal cdf, its from pymc3.distributions.dist_math

If my [start,stop] is a given set of arrays, that is, only one parameter a is a variable, then how do I write it?

Dear friend,

Thank you very much for your code. I try a simple example to understand the code better. When I use one parameter everything is OK but when I pass two parameters, it doesn’t work. Following you can see my code. Any help would be greatly appreciated.

from scipy.integrate import quad
import theano
import theano.tensor as tt
import numpy as np


class Integrate(theano.Op):
    def __init__(self, expr, var, *extra_vars):
        super().__init__()
        self._expr = expr
        self._var = var
        self._extra_vars = extra_vars
        self._func = theano.function(
            [var] + list(extra_vars),
            self._expr,
            on_unused_input='ignore')
    
    def make_node(self, start, stop, *extra_vars):
        self._extra_vars_node = extra_vars
        assert len(self._extra_vars) == len(extra_vars)
        self._start = start
        self._stop = stop
        vars = [start, stop] + list(extra_vars)
        #vars = list(extra_vars)
        return theano.Apply(self, vars, [tt.dscalar().type()])
    
    def perform(self, node, inputs, out):
        start, stop, *args = inputs
        val = quad(self._func, start, stop, args=tuple(args))[0]
        out[0][0] = np.array(val)
        
    def grad(self, inputs, grads):
        start, stop, *args = inputs
        out, = grads
        replace = dict(zip(self._extra_vars, args))
        
        replace_ = replace.copy()
        replace_[self._var] = start
        dstart = out * theano.clone(-self._expr, replace=replace_)
        
        replace_ = replace.copy()
        replace_[self._var] = stop
        dstop = out * theano.clone(self._expr, replace=replace_)

        grads = tt.grad(self._expr, self._extra_vars) 
        dargs = []
        for grad in grads:
            integrate = Integrate(grad, self._var, self._extra_vars)
            darg = out * integrate(start, stop, *args)
            dargs.append(darg)
   
        return [dstart, dstop] + dargs




y_obs = 8.3


start = theano.shared(1.)
stop = theano.shared(2.)
with pm.Model() as basic_model:
    a = pm.Uniform('a', 1.5, 3.5)
    b = pm.Uniform('b', 4., 6.)

    t = tt.dscalar('t')
    t.tag.test_value = np.zeros(())
   
    func = t**a + b
    integrate = it.Integrate(func,t,a,b)
    mu = integrate(start,stop,a,b)
    

    y = pm.Normal('y', mu=mu, sd=0.4, observed=y_obs)
    trace = pm.sample(1500,tune=500, cores=2,chains=2)

There is a small mistake in the gradient part:

...
for grad in grads:
    integrate = Integrate(grad, self._var, *self._extra_vars)
    ...

Also, the correct usage should be:

y_obs = 8.3

start = theano.shared(1.)
stop = theano.shared(2.)
with pm.Model() as basic_model:
    a = pm.Uniform('a', 1.5, 3.5)
    b = pm.Uniform('b', 4., 6.)

    # Define the function to integrate in plain theano
    t = tt.dscalar('t')
    t.tag.test_value = np.zeros(())
    a_ = tt.dscalar('a_')
    a_.tag.test_value = np.ones(())*2.
    b_ = tt.dscalar('b_')
    b_.tag.test_value = np.ones(())*5.
    func = t**a_ + b_
    integrate = Integrate(func, t, a_, b_)

    # Now we plug in the values from the model.
    # The `a_` and `b_` from above corresponds to the `a` and `b` here.
    mu = integrate(start, stop, a, b)
    y = pm.Normal('y', mu=mu, sd=0.4, observed=y_obs)
    trace = pm.sample(1500, tune=500, cores=2, chains=2)

Thanks for your help. I updated my code according to your point and now the code is correct but I face another problem. When I use NUTS and SMC step methods, the code works but when I use the Metropolis or Hamiltonian step methods, it doesn’t work. In fact, in the case of the Metropolis and Hamiltonian, the code only samples from the prior. It seems strange. Any help? Following I attached my codes.

integration.py (1.6 KB)

test2.py (1.1 KB)

Metropolis is likely not suitable for these kind of problem (as it is inefficient), and to make HMC to work you need to hand tune a lot of parameters.

Perhaps this is a really dumb question …

Would the custom Theano Op Integrate work when either start or stop are infinite? I wonder because it called scipy.intergrate.quad and this function does calculate infinite integrals.

Edit: I get the error TypeError: Cannot convert Type TensorType(float32, scalar) (of Variable a) into Type TensorType(float64, scalar). You can try to manually convert a into a TensorType(float64, scalar). when trying to run this.

Hi

This post has been very useful to me and I was able to actually do the integrals that I need. However, I get an error when actually running a model calculation. It is the same error I get when I run @AH5719 's scripts:

Traceback (most recent call last):
  File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/theanof.py", line 80, in floatX
    return X.astype(theano.config.floatX)
AttributeError: 'list' object has no attribute 'astype'

During handling of the above exception, another exception occurred:

TypeError: float() argument must be a string or a number, not 'TensorVariable'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test2.py", line 44, in <module>
    y = pm.Normal('y', mu=mu, sd=0.1, observed=y_obs)
  File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/distribution.py", line 82, in __new__
    dist = cls.dist(*args, **kwargs)
  File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/distribution.py", line 91, in dist
    dist.__init__(*args, **kwargs)
  File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/distributions/continuous.py", line 488, in __init__
    self.mean = self.median = self.mode = self.mu = mu = tt.as_tensor_variable(floatX(mu))
  File "/home/lorenzo/.local/lib/python3.8/site-packages/pymc3/theanof.py", line 83, in floatX
    return np.asarray(X, dtype=theano.config.floatX)
  File "/opt/conda/lib/python3.8/site-packages/numpy/core/_asarray.py", line 83, in asarray
    return array(a, dtype, copy=False, order=order)
ValueError: setting an array element with a sequence.

So, it seems that pm.Normal does not accept a list of Integrate class objects. I’m running the code with python 3.8.5, pymc3 3.9.2 and theano 1.0.5. Does anybody have an idea how to solve this? Thanks a lot!

I found a work around: just cast the list of integrals into a tensor variable, i.e. modify the following line in the test2.py file in the previous post to

y = pm.Normal('y', mu=tt.as_tensor_variable(mu), sd=0.1, observed=y_obs)

Now, everything runs well.

2 Likes

Thank you for all the help, this was very useful. I am trying to generalize the function to the case where the further parameters can be vectors or matrices, e.g. for the following case:

y_obs = 8.3
start = aesara.shared(1.)
stop = aesara.shared(2.)

with pm.Model() as basic_model:
    
    a = pm.Uniform('a', 1.5, 3.5)
    b = pm.Uniform(
        'b', 
        4., 6., 
        shape=(3)
    )

    # Define the function to integrate in plain pytensor
    t = at.dscalar('t')
    t.tag.test_value = np.zeros(())
    a_ = at.dscalar('a_')
    a_.tag.test_value = np.ones(())*2.
    
    b_ = at.dvector('b_')
    b_.tag.test_value = np.ones((3))*5.

    func = t**a_ + b_.sum()
    integrate = Integrate(
        # Function we're integrating
        func, 
        # variable we're integrating
        t, 
        # other variables
        a_, b_
    )

    # Now we plug in the values from the model.
    # The `a_` and `b_` from above corresponds 
    # to the `a` and `b` here.
    mu = integrate(
        start, 
        stop, 
        a, 
        b
    )
    y = pm.Normal(
        'y', 
        mu=mu, 
        sigma=0.4, 
        observed=y_obs
    )
    
    trace = pm.sample(
        1500, 
        tune=500, 
        cores=2, 
        chains=2,
        return_inferencedata=True
    )

When I run the above with the Integrate Op as defined above in this thread, I get the following error:

ValueError: Integrate.grad returned a term with 0 dimensions, but 1 are required.

I assume we should specify somewhere that the gradient of b is 1-d now, but where should that happen? Thanks!

Hi folks,

I adapted the function to work with (either scalar or vector valued) functions f: R^{d_1}_1 \times ... \times R^{d_n}_n \rightarrow R^k, such that the argument wrt which we’re integrating is a scalar and the other arguments are either scalars or vectors.

import numpy as np
import pymc as pm
import pytensor as aesara
import pytensor.tensor as at

# Needed for integration
from scipy.integrate import quad, quad_vec
from pytensor.graph.op import Op
from pytensor.graph.basic import Apply
from pytensor import clone_replace

class Integrate(Op):
    
    # Class to perform integration of a scalar variable
    # on a bounded interval
    
    # Adapted from:
    # https://discourse.pymc.io/t/custom-theano-op-to-do-numerical-integration/734/12
    # With some modifications!
    
    def __init__(self, expr, var, *extra_vars):
        """
        Parameters
        ----------
        expr: Aesara Variable
            The expression encoding the output
        var: Aesara Variable
            The input variable
        """
        super().__init__()
        
        # function we're integrating
        self._expr = expr
        
        # input var we're integrating over
        self._var = var
        
        # other variables
        self._extra_vars = extra_vars
        
        # transform expression into callable function
        self._func = aesara.function(
            # a list with all the inputs
            [var] + list(extra_vars),
            # output
            self._expr,
            on_unused_input='ignore'
        )
    
    def make_node(self, start, stop, *extra_vars):
        
        self._extra_vars_node = extra_vars
        
        # make sure that the same number of extra variables
        # are passed here as were specified when defining the Op
        assert len(self._extra_vars) == len(extra_vars)
        
        # Define the bounds of integration
        self._start = start
        self._stop = stop
                
        # return an Apply instance with the input and output Variable
        return Apply(
            # op: The operation that produces `outputs` given `inputs`.
            op=self, 
            # inputs: The arguments of the expression modeled by the `Apply` node.
            inputs=[start, stop] + list(extra_vars), 
            # outputs: The outputs of the expression modeled by the `Apply` node.
            # NOTE: This is a scalar if self._expr is a scalar,
            # and a vector if self._expr is a vector. Etc.
            outputs=[self._expr.type()]
        )
    
    def perform(self, node, inputs, out):
        """
        Out is the output storage.
        Inputs are passed by value.
        A single output is returned indirectly 
        as the first element of single-element lists (out)
        
        NOTE: There's a restriction, namely the variable to integrate
        has to be a scalar, even though the other variables can vector.
        
        Parameters
        ----------
        node: Apply node
            The output of make_node
        inputs: List of data
            The data can be operated on with numpy
        out: List
            output_storage is a list of storage cells where the output 
            is to be stored. There is one storage cell for each output of the Op. 
            The data put in output_storage must match the type of the symbolic output. 
        """

        # Runs the computation in python
        start, stop, *args = inputs
                
        if self._expr.ndim == 0:
            val = quad(
                self._func, 
                start, 
                stop, 
                args=tuple(args)
            )[0]
        elif self._expr.ndim == 1:
            # if the function is vector-valued
            # (e.g., the gradient of a vector),
            # use quad_vec
            val = quad_vec(
                self._func,
                start,
                stop,
                args=tuple(args)
            )[0]
        else:
            shape = self._func(
                start,
                *args
            ).shape
            
            def helpfunc(*args):
                return self._func(*args).flatten()
            
            val = quad_vec(
                helpfunc,
                start,
                stop,
                args=tuple(args)
            )[0].reshape(shape)
        
        # in-place modification of "out".
        # out is a single-element list
        out[0][0] = np.array(val)
        
    def grad(self, inputs, grads):
        """
        NOTE: This function does not calculate the gradient
        but rather implements part of the chain rule,
        i.e. multiplies the grads by the gradient wrt to the cost
        See https://aesara.readthedocs.io/en/latest/extending/op.html
        for an explanation
        
        Inputs in this case contains: 
        [lower integration bound, upper integration bound, ...other variables of function]
        """
        
        # unpack the input
        start, stop, *args = inputs
        out, = grads
        
        # dictionary with the extra variables as keys
        # and the extra variables in "inputs" as values
        replace = dict(zip(
            self._extra_vars, 
            args
        ))
        
        # The derivative of integral wrt to the upper limit of integration
        # is just the value of the function at that limit
        # (for lower limit, it's minus the function)
        # See e.g.,
        # https://math.stackexchange.com/questions/984111/differentiating-with-respect-to-the-limit-of-integration
        replace_ = replace.copy()
        replace_[self._var] = start
        dstart = out * clone_replace(
            # Clone a graph and replace subgraphs within it. 
            # It returns a copy of the initial subgraph with the corresponding
            # substitutions.
            -self._expr, 
            # Dictionary describing which subgraphs should be replaced by what.
            replace=replace_
        )
        
        replace_ = replace.copy()
        replace_[self._var] = stop
        dstop = out * clone_replace(
            self._expr, 
            replace=replace_
        )

        # calculate the symbolic gradient of self._expr
        # wrt each extra variable.
        # This can be done because they're symbolic aesara variables!
        # This corresponds to the gradient of the expression
        # *inside* the integral (the inner part of Leibniz'
        # integral rule)
        grads = at.jacobian(
            # cost
            self._expr, 
            # wrt
            self._extra_vars
        )
        
        dargs = []
        # loop over the gradients of the extra vars
        for grad in grads:
                        
            # define an Apply node
            # for that gradient
            integrate = Integrate(
                grad, 
                # variable we're integrating over
                self._var, 
                *self._extra_vars
            )
            
            # Apply Leibniz' integral rule:
            # call integrate, which evaluates
            # the integral of the gradient.
            # And then multiply with previous gradient
            # that was passed in the input.
            # NOTE: This is not actually doing the operation,
            # but rather calling make_node, which *creates the node*
            # that does the operation
            darg = at.dot(
                integrate(
                    start, stop, 
                    *args
                ).T, 
                out  
            )
            
            dargs.append(darg)
        
        # return a list with one Variable for each input in inputs
        return [dstart, dstop] + dargs

I changed a few things:

  • Apply in the return value of make_node now gets outputs=[self._expr.type()], since the expression can be any shape.
  • perform adapts depending on the shape of the output to self._func. If the output is a scalar, it uses quad. If it is a vector, it uses quad_vec. Otherwise, it defines a helper function that flattens the output of self._func, then runs quad, and finally reshapes it back into the original shape.
  • The previous point is useful because in grad() we integrate, with respect to self._var, the Jacobian of self_expr with respect to each of the extra variables. Since some of the extra variables can be vectors and self_var can be a vector, the Jacobian can be up to 2 dimensional.

I tested the code with a modified version of the example above, where one of the inputs (b) is now a vector, and the function to integrate, i.e., f(a, b, t) = t^a + b, is vector valued. We are calculating \int_1^2 f(a, b, t) dt:

### Example of usage

y_obs = np.array([8.3, 8.0, 7.8])
start = aesara.shared(1.)
stop = aesara.shared(2.)

with pm.Model() as basic_model:
    
    a = pm.Uniform('a', 1.5, 3.5)
    b = pm.Uniform(
        'b', 
        4., 6., 
        shape=(3)
    )

    # Define the function to integrate in plain pytensor
    t = at.dscalar('t')
    t.tag.test_value = np.zeros(())
    
    a_ = at.dscalar('a_')
    a_.tag.test_value = np.ones(())*2.
    
    b_ = at.dvector('b_')
    b_.tag.test_value = np.ones((3))*5.
    
    func = t**a_ + b_
    integrate = Integrate(
        # Function we're integrating
        func, 
        # variable we're integrating
        t, 
        # other variables
        a_, b_
    )

    # Now we plug in the values from the model.
    # The `a_` and `b_` from above corresponds 
    # to the `a` and `b` here.
    mu = integrate(
        start, 
        stop, 
        a, 
        b
    )
    y = pm.Normal(
        'y', 
        mu=mu, 
        sigma=0.4, 
        observed=y_obs
    )

with basic_model:
    trace = pm.sample(
        1500, 
        tune=500, 
        cores=2, 
        chains=2,
        return_inferencedata=True
    )

Any feedback would be greatly appreciated - there might be some mistakes. I have also used this for a more complex model and it’s extremely slow, so it would be awesome to find ways of making it faster. Thanks!

Looks correct. You can set a flag self._func.trust_input=True after you first define _func. That will skip checking the type of the inputs which can cause quise some overhead when calling the function.

If you are interested we could add this as a utility directly in PyTensor (our current backend/fork of Aesara).

We would probably tweak it slightly in that case. Usually we use FunctionGraph to represent inner graphs and only compile as a function later on. There is an example of a new Loop Op that we are building you can have a look at: Implement new Loop and Scan operators by ricardoV94 · Pull Request #191 · pymc-devs/pytensor · GitHub (specifically look at pytensor/loop/op.py::Loop)

Thank you for the help! I tried the advice re trust_input.

Adding self._func.trust_input=True in __init__ right after the definition of self._func makes the kernel die when running the example in a jupyter notebook. When running the code from the command line, it raises a Segmentation fault.

Any additional ideas on how to make it faster would also be great!

That means some of the inputs are not of the right type

I am struggling to see what the problem could be - a, b, and t all seem to be the right type to me in the example above. I tried with scalar b and it still raises the Segmentation error. Any thoughts / hints?

Can you print their types (and dtypes) in the perform method (before you pass them to _func)? That should be pretty clear. Maybe you have python literals you need to convert to numpy arrays.

I added:

print(inputs)
print([type(a) for a in inputs])
print([a.dtype for a in inputs])

to perform, and when running the example above the output is:

Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
[array(1.), array(2.), array(2.08049756), array([5.46187651, 5.20988144, 4.94399615])]
[<class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'numpy.ndarray'>]
[dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64')]
Segmentation fault

The inputs are of the expected shapes and correspond to start, stop, a, and b.

Unless I am missing something, this looks as it should, so I guess the problem must be somewhere else (?).

What are the node input types? If setting trust input to false is causing a segfault it must be that the function was converting them to other types (maybe float32?)

Everything seems to be right:

Adding this to __init__:

print("Input types: ", [i.dtype for i in [var] + list(extra_vars)])
print("Output type: ", self._expr.type, self._expr.dtype)

prints out:

Input types:  ['float64', 'float64', 'float64']
Output type:  TensorType(float64, (?,)) float64

I also added this to make_node:

print("Make node input types: ", [
    (i.type, i.dtype)
    for i in
    [self._start, self._stop] + list(self._extra_vars_node)
])
print(
    "Make node output type: ", 
    self._expr.type()
)

which prints:

Make node input types:  [(TensorType(float64, ()), 'float64'), (TensorType(float64, ()), 'float64'), (TensorType(float64, ()), 'float64'), (TensorType(float64, (3,)), 'float64')]
Make node output type:  <TensorType(float64, (?,))>

I thought the problem could be that it doesn’t know the exact output shape, but passing it directly doesn’t seem to solve the problem either.

Hmm, by this point I would have to go through the self._func.__call__ and see what change is happening when trust_input is True. Anyway, that’s likely a rabbit hole not worth pursuing :slight_smile: