Setting hard limits for the estimated value

I have a pretty basic model setup to model the formula y = a + b0x + b1x, where y is velocity and x is load.

Basically, the higher the load, the lower velocity. Until it has no velocity.

The issue I’m having is that on rare occasions y Is estimated to be negative. Now, this means it would move in reverse, which is physically impossible. There’s also an upper limit, the velocity with 0 load, but that limit is unknown, whereas the other is known.

How do I include this hard stop for 0 velocity?

My suspicion is that the fact that I’m using the normal distribution for the likelihood, and it could be handled by using TruncatedNormal.

Am I on the right track or is there something else I could do?

To further complicate things, since I’m standardizing the values in the mode, 0 velocity is seen as something like -2.68 for the model. But if I would use the TruncatedNormal, what I think would work is to standardize 0 as well.

Truncated normal could work. You can also look into bounding arbitrary parameters if you wanted to constrain something that wasn’t normally distributed. Do be aware that placing hard constraints on parameter values may cause difficulties during sampling.

1 Like

Aah! Thanks for sharing that :relaxed:

Unfortunately what TruncatedNormal gave me was errors :disappointed:

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in _norm_logcdfprime(z)
   7048 def _norm_logcdfprime(z):
   7049     # derivative of special.log_ndtr
-> 7050     assert np.abs(z) > TRUNCNORM_TAIL_X/2
   7051     lhs = -z - 1/z
   7052     denom_cons = 1/z**2

AssertionError: 

Can we see the code that is generating that error?

Absolutely!

Model function:

def quadratic_fit(y, x, weight, filepath = None, **kwargs):
    with pm.Model() as model:
        x_shared = pm.Data('x', x)
        y_shared = pm.Data('y', y)
        weight_shared = pm.Data('weight', weight)

        # Standardize inputs
        x_mean = tt.mean(x_shared)
        x_sd = tt.std(x_shared)
        x_scaled = (x_shared - x_mean)/x_sd
        
        y_mean = tt.mean(y_shared)
        y_sd = tt.std(y_shared)
        y_scaled = (y_shared - y_mean)/y_sd

        # Hyperparameters
        alpha_scaled = pm.Normal('alpha_scaled', mu = 0, sigma = 0.2)
        beta0_scaled_pos = pm.Gamma('beta0_scaled_pos', alpha = 2, beta = 0.5)
        beta1_scaled_pos = pm.Gamma('beta1_scaled_pos', alpha = 1, beta = 0.5)
        sigma_scaled = pm.HalfNormal('sigma_scaled', sigma = 0.5)

        # Transform parameters
        beta0_scaled = -beta0_scaled_pos
        beta1_scaled = -beta1_scaled_pos
        sigma_scaled_weighted = pm.Deterministic('sigma_scaled_weighted', 1 / (weight_shared / sigma_scaled**2)**0.5)

        # Expected value
        y_pred_scaled = alpha_scaled + beta0_scaled*x_scaled + beta1_scaled*x_scaled**2

        likelihood_scaled = pm.TruncatedNormal('likelihood_scaled',
                                               mu = y_pred_scaled,
                                               sigma = sigma_scaled_weighted,
                                               lower = (0 - y_mean)/y_sd,
                                               observed = y_scaled)
        
        # Reverse standardization
        alpha = pm.Deterministic('alpha', y_sd * (alpha_scaled - beta0_scaled*x_mean/x_sd + beta1_scaled*x_mean**2/x_sd**2) + y_mean)
        beta0 = pm.Deterministic('beta0', beta0_scaled*y_sd/x_sd - 2*beta1_scaled*x_mean*y_sd/x_sd**2)
        beta1 = pm.Deterministic('beta1', beta1_scaled*y_sd/x_sd**2)

        # Run inference
        trace = pm.sample(target_accept = 0.98,
                          init = 'advi',
                          progressbar = True,
                          return_inferencedata = False)
        
        prior_predictions = pm.sample_prior_predictive()
        posterior_predictions = pm.fast_sample_posterior_predictive(trace)
        inference_data = az.from_pymc3(trace = trace,
                                       prior = prior_predictions,
                                       posterior_predictive = posterior_predictions)
        
        if filepath:
            print(f'Saving posterior to: {filepath}')
            inference_data.posterior.to_netcdf(filepath)

    return inference_data

Data and function call:

x = [20, 30, 40, 45, 50, 55, 57.5]
y = [1.21, 1.02, 0.77, 0.63, 0.49, 0.38, 0.5]
weight = np.ones(len(y))
idata = quadratic_fit(y, x, weight)

idata

Full output:

WARNING (theano.tensor.opt): Optimization Warning: The Op erfcx does not provide a C implementation. As well as being potentially slow, this also disables loop fusion.
Auto-assigning NUTS sampler...
Initializing NUTS using advi...

 6.92% [13844/200000 00:02<00:35 Average Loss = 11.958]
Convergence achieved at 14900
Interrupted at 14,899 [7%]: Average Loss = 101.78
Sequential sampling (2 chains in 1 job)
NUTS: [sigma_scaled, beta1_scaled_pos, beta0_scaled_pos, alpha_scaled]

 100.00% [2000/2000 00:09<00:00 Sampling chain 0, 0 divergences]

 100.00% [2000/2000 00:09<00:00 Sampling chain 1, 0 divergences]
Sampling 2 chains for 1_000 tune and 1_000 draw iterations (2_000 + 2_000 draws total) took 19 seconds.
The number of effective samples is smaller than 25% for some parameters.
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-11-8645b6e6f1c9> in <module>()
      2 y = [1.21, 1.02, 0.77, 0.63, 0.49, 0.38, 0.5]
      3 weight = np.ones(len(y))
----> 4 idata = quadratic_fit(y, x, weight)
      5 
      6 idata

15 frames
<ipython-input-9-a71118ba6d19> in quadratic_fit(y, x, weight, filepath, **kwargs)
     45                           return_inferencedata = False)
     46 
---> 47         prior_predictions = pm.sample_prior_predictive()
     48         posterior_predictions = pm.fast_sample_posterior_predictive(trace)
     49         inference_data = az.from_pymc3(trace = trace,

/usr/local/lib/python3.7/dist-packages/pymc3/sampling.py in sample_prior_predictive(samples, model, var_names, random_seed)
   1942     names = get_default_varnames(vars_, include_transformed=False)
   1943     # draw_values fails with auto-transformed variables. transform them later!
-> 1944     values = draw_values([model[name] for name in names], size=samples)
   1945 
   1946     data = {k: v for k, v in zip(names, values)}

/usr/local/lib/python3.7/dist-packages/pymc3/distributions/distribution.py in draw_values(params, point, size)
    789                     # This may fail for autotransformed RVs, which don't
    790                     # have the random method
--> 791                     value = _draw_value(next_, point=point, givens=temp_givens, size=size)
    792                     givens[next_.name] = (next_, value)
    793                     drawn[(next_, size)] = value

/usr/local/lib/python3.7/dist-packages/pymc3/distributions/distribution.py in _draw_value(param, point, givens, size)
    969                 dist_tmp.shape = distshape
    970                 try:
--> 971                     return dist_tmp.random(point=point, size=size)
    972                 except (ValueError, TypeError):
    973                     # reset shape to account for shape changes

/usr/local/lib/python3.7/dist-packages/pymc3/distributions/continuous.py in random(self, point, size)
    712             upper=upper,
    713             dist_shape=self.shape,
--> 714             size=size,
    715         )
    716 

/usr/local/lib/python3.7/dist-packages/pymc3/distributions/distribution.py in generate_samples(generator, *args, **kwargs)
   1114         )
   1115     if dist_bcast_shape[: len(size_tup)] == size_tup:
-> 1116         samples = generator(size=dist_bcast_shape, *args, **kwargs)
   1117     else:
   1118         samples = generator(size=size_tup + dist_bcast_shape, *args, **kwargs)

/usr/local/lib/python3.7/dist-packages/pymc3/distributions/continuous.py in _random(self, mu, sigma, lower, upper, size)
    722         """
    723         return stats.truncnorm.rvs(
--> 724             a=(lower - mu) / sigma, b=(upper - mu) / sigma, loc=mu, scale=sigma, size=size
    725         )
    726 

/usr/local/lib/python3.7/dist-packages/scipy/stats/_distn_infrastructure.py in rvs(self, *args, **kwds)
    978         # by _rvs().
    979         self._size = size
--> 980         vals = self._rvs(*args)
    981 
    982         vals = vals * scale + loc

/usr/local/lib/python3.7/dist-packages/scipy/stats/_distn_infrastructure.py in _rvs(self, *args)
    911         ## Use basic inverse cdf algorithm for RV generation as default.
    912         U = self._random_state.random_sample(self._size)
--> 913         Y = self._ppf(U, *args)
    914         return Y
    915 

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in _ppf(self, q, a, b)
   7161 
   7162     def _ppf(self, q, a, b):
-> 7163         return _truncnorm_ppf(q, a, b)
   7164 
   7165     def _munp(self, n, a, b):

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in vf_wrapper(*args)
   6931         @functools.wraps(f)
   6932         def vf_wrapper(*args):
-> 6933             return vf(*args)
   6934         return vf_wrapper
   6935     return vectorize_decorator

/usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py in __call__(self, *args, **kwargs)
   2106             vargs.extend([kwargs[_n] for _n in names])
   2107 
-> 2108         return self._vectorize_call(func=func, args=vargs)
   2109 
   2110     def _get_ufunc_and_otypes(self, func, args):

/usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
   2190                       for a in args]
   2191 
-> 2192             outputs = ufunc(*inputs)
   2193 
   2194             if ufunc.nout == 1:

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in _truncnorm_ppf(q, a, b)
   7090 
   7091     if np.isinf(b):
-> 7092         x = -_norm_ilogcdf((np.log1p(-q) + _norm_logsf(a)))
   7093         return x
   7094     elif np.isinf(a):

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in _norm_ilogcdf(y)
   7069     z = -np.sqrt(-2*(y + np.log(2*np.pi)/2))
   7070     for _ in range(3):
-> 7071         z = z - (_norm_logcdf(z) - y) / _norm_logcdfprime(z)
   7072     return z
   7073 

/usr/local/lib/python3.7/dist-packages/scipy/stats/_continuous_distns.py in _norm_logcdfprime(z)
   7048 def _norm_logcdfprime(z):
   7049     # derivative of special.log_ndtr
-> 7050     assert np.abs(z) > TRUNCNORM_TAIL_X/2
   7051     lhs = -z - 1/z
   7052     denom_cons = 1/z**2

AssertionError: 

That code seems to be working for me. I running pymc 3.11.2. What are you working with?

Strange, same here. Running it on Colab, might be some issue with that