Custom distribution for Truncated Gamma

I am trying to create a custom distribution for Truncated Gamma, as shown in the code below.

However, I got the following error.(logp)

from pymc3.distributions.dist_math import bound
from pymc3.distributions import draw_values, generate_samples
import theano.tensor as tt
import scipy.stats.distributions
import scipy.stats

from pymc3.distributions.dist_math import gammaln, logpow


class TruncatedGamma(pm.Continuous):
    def __init__(self, alpha, beta, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.alpha = alpha = tt.as_tensor_variable(alpha)
        self.beta = beta = tt.as_tensor_variable(beta)

    def tg_cdf(self, alpha, beta, size=None):
        alpha = np.asarray(alpha)
        beta = np.asarray(beta)
        dist = scipy.stats.distributions.gamma(alpha, scale=1.0/beta)
        
        # truncate values
        min_val = 40
        lower_cdf = dist.cdf(min_val)
        upper_cdf = 1
        nrm = upper_cdf - lower_cdf
        sample = np.random.random(size=size) * nrm + lower_cdf

        return dist.ppf(sample)

    def random(self, point=None, size=None):
        alpha, beta = draw_values([self.alpha, self.beta], point=point, size=size)
        samples = generate_samples(
            self.tg_cdf
            , alpha
            , 1.0 / beta
            , dist_shape=self.shape
            , size=size
        )
        return samples

    def logp(self, value):
        alpha = self.alpha
        beta = self.beta
        
        p = logpow(beta, alpha) \
            + logpow(value, alpha - 1) \
            - beta * value - gammaln(alpha) \
            - pm.math.log(1 - scipy.stats.gamma.cdf(40, alpha, scale=1.0/beta)) # error

        log_prob = bound(
            p,
            alpha > 0, beta > 0, value >= 0)
        return log_prob
    
    

import pymc3 as pm

with pm.Model() as model:
    alpha_t = pm.Uniform('alpha_t', 0, 100)
    beta_t = pm.Uniform('beta_t', 0, 100)
    data_obs = TruncatedGamma('data_obs', alpha=alpha_t, beta=beta_t, observed=y)

    
with model:
    trace = pm.sample(5000, chains=4, start=pm.find_MAP(), return_inferencedata=False, idata_kwargs={"log_likelihood": False})
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
TypeError: float() argument must be a string or a number, not 'TensorVariable'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Input In [11], in <cell line: 3>()
      4     alpha_t = pm.Uniform('alpha_t', 0, 100)
      5     beta_t = pm.Uniform('beta_t', 0, 100)
----> 6     data_obs = TruncatedGamma('data_obs', alpha=alpha_t, beta=beta_t, observed=y)
      9 with model:
     10     trace = pm.sample(5000, chains=4, start=pm.find_MAP(), return_inferencedata=False, idata_kwargs={"log_likelihood": False})

File ~/.local/lib/python3.8/site-packages/pymc3/distributions/distribution.py:125, in Distribution.__new__(cls, name, *args, **kwargs)
    123 else:
    124     dist = cls.dist(*args, **kwargs)
--> 125 return model.Var(name, dist, data, total_size, dims=dims)

File ~/.local/lib/python3.8/site-packages/pymc3/model.py:1177, in Model.Var(self, name, dist, data, total_size, dims)
   1175 else:
   1176     with self:
-> 1177         var = ObservedRV(
   1178             name=name,
   1179             data=data,
   1180             distribution=dist,
   1181             total_size=total_size,
   1182             model=self,
   1183         )
   1184     self.observed_RVs.append(var)
   1185     if var.missing_values:

File ~/.local/lib/python3.8/site-packages/pymc3/model.py:1828, in ObservedRV.__init__(self, type, owner, index, name, data, distribution, total_size, model)
   1825 data = as_tensor(data, name, model, distribution)
   1827 self.missing_values = data.missing_values
-> 1828 self.logp_elemwiset = distribution.logp(data)
   1829 # The logp might need scaling in minibatches.
   1830 # This is done in `Factor`.
   1831 self.logp_sum_unscaledt = distribution.logp_sum(data)

Input In [8], in TruncatedGamma.logp(self, value)
     42 alpha = self.alpha
     43 beta = self.beta
     45 p = logpow(beta, alpha) \
     46     + logpow(value, alpha - 1) \
     47     - beta * value - gammaln(alpha) \
---> 48     - pm.math.log(1 - scipy.stats.gamma.cdf(40, alpha, scale=1.0/beta))
     50 log_prob = bound(
     51     p,
     52     alpha > 0, beta > 0, value >= 0)
     53 return log_prob

File /opt/layers/datascience/site-packages/scipy/stats/_distn_infrastructure.py:1844, in rv_continuous.cdf(self, x, *args, **kwds)
   1842 _a, _b = self._get_support(*args)
   1843 dtyp = np.find_common_type([x.dtype, np.float64], [])
-> 1844 x = np.asarray((x - loc)/scale, dtype=dtyp)
   1845 cond0 = self._argcheck(*args) & (scale > 0)
   1846 cond1 = self._open_support_mask(x, *args) & (scale > 0)

File /opt/layers/datascience/site-packages/numpy/core/_asarray.py:83, in asarray(a, dtype, order)
     14 @set_module('numpy')
     15 def asarray(a, dtype=None, order=None):
     16     """Convert the input to an array.
     17 
     18     Parameters
   (...)
     81 
     82     """
---> 83     return array(a, dtype, copy=False, order=order)

ValueError: setting an array element with a sequence.

I think the reason is that I have used scipy in the part where the cumulative distribution function of the gamma distribution is calculated. To calculate the cumulative distribution function of the gamma function, an incomplete gamma function is required, but theano does not seem to have a method to obtain an incomplete gamma function.

In this case, would it be difficult to create Truncated Gamma distribution?

If you upgrade to PyMC 4.4 or 5.0, you can just do pm.Truncated("name", pm.Gamma.dist(alpha, beta), lower=40, upper=None): pymc.Truncated — PyMC 5.0.0 documentation

4 Likes

Oh…I didn’t know such a useful class was implemented in PyMC 4.4. Very helpful, thank you so much!!!

Is the pm.Truncated expected to work on observed data as well? I get a NotImplementedError ‘Truncation not implemented for SymbolicRandomVariable’ and it suggests making CustomDist instead

Truncated should work for observed data. What distribution are you trying to truncate? Also make sure to try on the latest version of PyMC as we have extended the functionality in recent times

I’m using an ExGaussian distribution. My pymc version is ‘5.15.1’.

I upgraded to pymc 5.16.2 and am still getting the same error for the ExGaussian distribution. Is there a way to ensure this distribution is a pure RandomVariable, or is there another way to truncate the distribution?

It’s a current limitation. You can workaround it by wrapping in a CustomDist:

import pymc as pm

with pm.Model() as m:
    mu = 0
    sigma = 1
    nu = 1
    
    exgaussian_dist = pm.CustomDist.dist(
        mu,
        sigma,
        nu,
        dist=lambda mu, sigma, nu, size: pm.ExGaussian.dist(mu, sigma, nu, size=size),
    )
    truncated = pm.Truncated("truncated", exgaussian_dist, lower=1, upper=3)

Should be fixed by Allow more distributions to be truncated by ricardoV94 · Pull Request #7476 · pymc-devs/pymc · GitHub