Model oversamples low probability event

I am trying to create a model where you provide an initial probability vector, you move that to log space, you add subject specific noise and trial specific noise, and then you revert back to probability space, where you sample n subjects with x trials. I am currently observing that my model tends to oversample the low probability event by ~1-2% depending on the noise I add. I suspect this is because if the original low probability is .1, then the increase due to noise is disproportionate to decrease due to noise. How can I account for that in my model?

Here is what I have done so far:

import pymc as pm
import numpy as np


n_subjects = 100
n_trials = 100

n_states = 3

subject_noise = .5
trial_noise = .5

state_probabilities = np.array([.1,.4,.5])



coords = {
    "subject": np.arange(n_subjects),
    "trial": np.arange(n_trials),
    "state": np.arange(n_states)
}


with pm.Model(coords=coords) as model:

    initial_log_p = pm.Deterministic("initial_log_p", pm.math.log(state_probabilities / (1 - state_probabilities)), dims="state")
    
    subj_noise = pm.Normal("subj_noise",0,subject_noise,dims=("subject","state"))
    
    trial_noise = pm.Normal("trial_noise",0,trial_noise, dims=("subject", "trial", "state"))

    total_noise = pm.Deterministic("total_noise", subj_noise[:, None, :] + trial_noise, dims=("subject", "trial", "state"))

    final_log = pm.Deterministic("final_log", initial_log_p + total_noise, dims=("subject", "trial", "state"))
    
    final_p_non_sum = pm.Deterministic("final_p_non_sum", pm.math.sigmoid(final_log), dims=("subject", "trial", "state"))
    
    dirichlet_sample = pm.Dirichlet("dirichlet_sample", a=final_p_non_sum*10, dims=("subject", "trial", "state"))

    sample_state = pm.Multinomial("sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state"))

    prior = pm.sample_prior_predictive(samples=1)

((prior.prior["sample_state"].data * [1,2,3]).sum(axis=-1)[0][0]  == 1).mean()

Hi!

One problem with your code is that it’s using the logit function when you actually have three states. Originally, the probabilities add up to 1, but once you add the noise and convert back, they don’t anymore. Look at this:

import pymc as pm
import numpy as np


n_subjects = 100
n_trials = 100

n_states = 3

subject_noise = 0.5
trial_noise = 0.5

state_probabilities = np.array([0.1, 0.4, 0.5])


coords = {
    "subject": np.arange(n_subjects),
    "trial": np.arange(n_trials),
    "state": np.arange(n_states),
}


with pm.Model(coords=coords) as model:
    initial_log_p = pm.Deterministic(
        "initial_log_p", pm.math.log(state_probabilities / (1 - state_probabilities)), dims="state"
    )

    subj_noise = pm.Normal("subj_noise", 0, subject_noise, dims=("subject", "state"))
    trial_noise = pm.Normal("trial_noise", 0, trial_noise, dims=("subject", "trial", "state"))

    total_noise = pm.Deterministic(
        "total_noise", subj_noise[:, None, :] + trial_noise, dims=("subject", "trial", "state")
    )

    #total_noise = np.zeros((n_subjects, n_trials, n_states))

    final_log = pm.Deterministic(
        "final_log", initial_log_p + total_noise, dims=("subject", "trial", "state")
    )

    final_p_non_sum = pm.Deterministic(
        "final_p_non_sum", pm.math.sigmoid(final_log), dims=("subject", "trial", "state")
    )

    dirichlet_sample = pm.Dirichlet(
        "dirichlet_sample", a=final_p_non_sum * 10, dims=("subject", "trial", "state")
    )

    sample_state = pm.Multinomial(
        "sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state")
    )

    prior = pm.sample_prior_predictive(samples=1)

prior.prior["final_p_non_sum"].sum("state")
array([[[[1.10843649, 0.86817036, 1.40206735, ..., 1.22028226,
          1.01429214, 1.28709837],
         [1.03685059, 0.91259717, 0.97796022, ..., 0.81577098,
          0.9989243 , 1.06497121],
         [0.80130395, 0.85805041, 1.22585814, ..., 0.98804735,
          1.35885752, 0.97182725],
         ...,
         [0.75566216, 0.97299343, 1.09703658, ..., 0.89944494,
          0.91053022, 0.56049512],
         [0.8237814 , 1.00179595, 0.79598627, ..., 0.84665124,
          0.80400659, 0.61714707],
         [0.75710897, 0.7109351 , 0.62665   , ..., 0.74708109,
          0.71287813, 1.13028959]]]])

In your case, you could consider the softmax function, check this: probability - Invert the softmax function - Mathematics Stack Exchange.

And the code would be

import pymc as pm
import numpy as np


n_subjects = 100
n_trials = 100

n_states = 3

subject_noise = 0.3
trial_noise = 0.3

state_probabilities = np.array([0.1, 0.4, 0.5])


coords = {
    "subject": np.arange(n_subjects),
    "trial": np.arange(n_trials),
    "state": np.arange(n_states),
}


with pm.Model(coords=coords) as model:
    initial_log_p = pm.Deterministic(
        "initial_log_p", pm.math.log(state_probabilities), dims="state"
    )

    subj_noise = pm.Normal("subj_noise", 0, subject_noise, dims=("subject", "state"))
    trial_noise = pm.Normal("trial_noise", 0, trial_noise, dims=("subject", "trial", "state"))

    total_noise = pm.Deterministic(
        "total_noise", subj_noise[:, None, :] + trial_noise, dims=("subject", "trial", "state")
    )

    final_log = pm.Deterministic(
        "final_log", initial_log_p + total_noise, dims=("subject", "trial", "state")
    )

    final_p_non_sum = pm.Deterministic(
        "final_p_non_sum", pm.math.softmax(final_log, axis=-1), dims=("subject", "trial", "state")
    )

    dirichlet_sample = pm.Dirichlet(
        "dirichlet_sample", a=final_p_non_sum * 1, dims=("subject", "trial", "state")
    )

    sample_state = pm.Multinomial(
        "sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state")
    )

    prior = pm.sample_prior_predictive(samples=1, random_seed=1)

prior.prior["final_p_non_sum"].sum("state").to_numpy()
array([[[[1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         ...,
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.]]]])
prior.prior["final_p_non_sum"].mean(("chain", "draw", "subject", "trial")).to_numpy()
array([0.1061953 , 0.40009692, 0.49370778])

However I’m still a bit unsure about what I’m doing. If I increase the error a bit, the probability seems to be always around ~0.11.


I just realized you’re adding state to the two noises, I’m not sure if that’s right. This is another approach without those:

import pymc as pm
import numpy as np


n_subjects = 100
n_trials = 100

n_states = 3

subject_noise = 0.5
trial_noise = 0.5

state_probabilities = np.array([0.1, 0.4, 0.5])


coords = {
    "subject": np.arange(n_subjects),
    "trial": np.arange(n_trials),
    "state": np.arange(n_states),
}


with pm.Model(coords=coords) as model:
    initial_log_p = pm.Deterministic(
        "initial_log_p", pm.math.log(state_probabilities), dims="state"
    )

    subj_noise = pm.Normal("subj_noise", 0, subject_noise, dims=("subject"))
    trial_noise = pm.Normal("trial_noise", 0, trial_noise, dims=("subject", "trial"))

    total_noise = pm.Deterministic(
        "total_noise", subj_noise[:, None] + trial_noise, dims=("subject", "trial")
    )

    final_log = pm.Deterministic(
        "final_log", initial_log_p[None, None, :] + total_noise[..., None], dims=("subject", "trial", "state")
    )

    final_p_non_sum = pm.Deterministic(
        "final_p_non_sum", pm.math.softmax(final_log, axis=-1), dims=("subject", "trial", "state")
    )

    dirichlet_sample = pm.Dirichlet(
        "dirichlet_sample", a=final_p_non_sum * 1, dims=("subject", "trial", "state")
    )

    sample_state = pm.Multinomial(
        "sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state")
    )

    prior = pm.sample_prior_predictive(samples=1, random_seed=1)

prior.prior["final_p_non_sum"].sum("state").to_numpy()
array([[[[1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         ...,
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.],
         [1., 1., 1., ..., 1., 1., 1.]]]])
prior.prior["final_p_non_sum"].mean(("chain", "draw", "subject", "trial")).to_numpy()
array([0.1, 0.4, 0.5])
((prior.prior["sample_state"].data * [1, 2, 3]).sum(axis=-1)[0][0] == 1).mean()
0.0987

I am sorry, I was not clear as to the goals of my model. Based on an original probability vector, I want to create a model that samples different probabilities for each state after I add specific subject and trial noise. So my aim is, at the multinomial, I would have different probabilities per subject and sample that would converge to the original state probabilities.

I tried to run the proposed model, and the softmax function removes all the noise added by the subj_noise and trial_noise, so the dirilecht distribution only samples from the original probability vector. Based on that I have some questions:

  1. If the softmax normalizes the probability to 0, is there a benefit to the Dirilecht ?
  2. Is there a way to feed the multinomial with subject+trial specific noise probability vectors?
  3. When I rerun your pipeline, I get the following error:
Sampling: [dirichlet_sample, sample_state, subj_noise, trial_noise]

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File ~\anaconda3\Lib\site-packages\pytensor\compile\function\types.py:959, in Function.__call__(self, *args, **kwargs)
    957 try:
    958     outputs = (
--> 959         self.vm()
    960         if output_subset is None
    961         else self.vm(output_subset=output_subset)
    962     )
    963 except Exception:

File ~\anaconda3\Lib\site-packages\pytensor\graph\op.py:524, in Op.make_py_thunk.<locals>.rval(p, i, o, n)
    522 @is_thunk_type
    523 def rval(p=p, i=node_input_storage, o=node_output_storage, n=node):
--> 524     r = p(n, [x[0] for x in i], o)
    525     for o in node.outputs:

File ~\anaconda3\Lib\site-packages\pytensor\tensor\random\op.py:403, in RandomVariable.perform(self, node, inputs, outputs)
    402     size = tuple(size)
--> 403 smpl_val = self.rng_fn(rng, *([*args, size]))
    405 if not isinstance(smpl_val, np.ndarray) or str(smpl_val.dtype) != self.dtype:

File ~\anaconda3\Lib\site-packages\pytensor\tensor\random\op.py:174, in RandomVariable.rng_fn(self, rng, *args, **kwargs)
    173 """Sample a numeric random variate."""
--> 174 return getattr(rng, self.name)(*args, **kwargs)

File _generator.pyx:1146, in numpy.random._generator.Generator.normal()

File _common.pyx:600, in numpy.random._common.cont()

File _common.pyx:505, in numpy.random._common.cont_broadcast_2()

File _common.pyx:384, in numpy.random._common.check_array_constraint()

ValueError: scale < 0

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
Cell In[9], line 37
     29 dirichlet_sample = pm.Dirichlet(
     30     "dirichlet_sample", a=final_p_non_sum * 1, dims=("subject", "trial", "state")
     31 )
     33 sample_state = pm.Multinomial(
     34     "sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state")
     35 )
---> 37 prior = pm.sample_prior_predictive(samples=1)

File ~\anaconda3\Lib\site-packages\pymc\sampling\forward.py:450, in sample_prior_predictive(draws, model, var_names, random_seed, return_inferencedata, idata_kwargs, compile_kwargs, samples)
    448 # All model variables have a name, but mypy does not know this
    449 _log.info(f"Sampling: {list(sorted(volatile_basic_rvs, key=lambda var: var.name))}")  # type: ignore
--> 450 values = zip(*(sampler_fn() for i in range(draws)))
    452 data = {k: np.stack(v) for k, v in zip(names, values)}
    453 if data is None:

File ~\anaconda3\Lib\site-packages\pymc\sampling\forward.py:450, in <genexpr>(.0)
    448 # All model variables have a name, but mypy does not know this
    449 _log.info(f"Sampling: {list(sorted(volatile_basic_rvs, key=lambda var: var.name))}")  # type: ignore
--> 450 values = zip(*(sampler_fn() for i in range(draws)))
    452 data = {k: np.stack(v) for k, v in zip(names, values)}
    453 if data is None:

File ~\anaconda3\Lib\site-packages\pytensor\compile\function\types.py:972, in Function.__call__(self, *args, **kwargs)
    970     if hasattr(self.vm, "thunks"):
    971         thunk = self.vm.thunks[self.vm.position_of_error]
--> 972     raise_with_op(
    973         self.maker.fgraph,
    974         node=self.vm.nodes[self.vm.position_of_error],
    975         thunk=thunk,
    976         storage_map=getattr(self.vm, "storage_map", None),
    977     )
    978 else:
    979     # old-style linkers raise their own exceptions
    980     raise

File ~\anaconda3\Lib\site-packages\pytensor\link\utils.py:524, in raise_with_op(fgraph, node, thunk, exc_info, storage_map)
    519     warnings.warn(
    520         f"{exc_type} error does not allow us to add an extra error message"
    521     )
    522     # Some exception need extra parameter in inputs. So forget the
    523     # extra long error message in that case.
--> 524 raise exc_value.with_traceback(exc_trace)

File ~\anaconda3\Lib\site-packages\pytensor\compile\function\types.py:959, in Function.__call__(self, *args, **kwargs)
    956 t0_fn = time.perf_counter()
    957 try:
    958     outputs = (
--> 959         self.vm()
    960         if output_subset is None
    961         else self.vm(output_subset=output_subset)
    962     )
    963 except Exception:
    964     restore_defaults()

File ~\anaconda3\Lib\site-packages\pytensor\graph\op.py:524, in Op.make_py_thunk.<locals>.rval(p, i, o, n)
    522 @is_thunk_type
    523 def rval(p=p, i=node_input_storage, o=node_output_storage, n=node):
--> 524     r = p(n, [x[0] for x in i], o)
    525     for o in node.outputs:
    526         compute_map[o][0] = True

File ~\anaconda3\Lib\site-packages\pytensor\tensor\random\op.py:403, in RandomVariable.perform(self, node, inputs, outputs)
    401 if size is not None:
    402     size = tuple(size)
--> 403 smpl_val = self.rng_fn(rng, *([*args, size]))
    405 if not isinstance(smpl_val, np.ndarray) or str(smpl_val.dtype) != self.dtype:
    406     smpl_val = _asarray(smpl_val, dtype=self.dtype)

File ~\anaconda3\Lib\site-packages\pytensor\tensor\random\op.py:174, in RandomVariable.rng_fn(self, rng, *args, **kwargs)
    172 def rng_fn(self, rng, *args, **kwargs) -> int | float | np.ndarray:
    173     """Sample a numeric random variate."""
--> 174     return getattr(rng, self.name)(*args, **kwargs)

File _generator.pyx:1146, in numpy.random._generator.Generator.normal()

File _common.pyx:600, in numpy.random._common.cont()

File _common.pyx:505, in numpy.random._common.cont_broadcast_2()

File _common.pyx:384, in numpy.random._common.check_array_constraint()

ValueError: scale < 0
Apply node that caused the error: normal_rv{"(),()->()"}(RNG(<Generator(PCG64) at 0x1D998E89700>), MakeVector{dtype='int64'}.0, [[0]], trial_noise)
Toposort index: 7
Inputs types: [RandomGeneratorType, TensorType(int64, shape=(2,)), TensorType(int8, shape=(1, 1)), TensorType(float64, shape=(None, None))]
Inputs shapes: ['No shapes', (2,), (1, 1), (100, 100)]
Inputs strides: ['No strides', (8,), (1, 1), (800, 8)]
Inputs values: [Generator(PCG64) at 0x1D998E89700, array([100, 100], dtype=int64), array([[0]], dtype=int8), 'not shown']
Outputs clients: [[output[12](normal_rv{"(),()->()"}.0)], [normal_rv{"(),()->()"}(RNG(<Generator(PCG64) at 0x1D997402A40>), MakeVector{dtype='int64'}.0, [[0]], trial_noise)]]

Backtrace when the node is created (use PyTensor flag traceback__limit=N to make it longer):
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\IPython\core\async_helpers.py", line 129, in _pseudo_sync_runner
    coro.send(None)
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\IPython\core\interactiveshell.py", line 3269, in run_cell_async
    has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\IPython\core\interactiveshell.py", line 3448, in run_ast_nodes
    if await self.run_code(code, result, async_=asy):
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\mpoul\AppData\Local\Temp\ipykernel_8616\4000903061.py", line 15, in <module>
    trial_noise = pm.Normal("trial_noise", 0, trial_noise, dims=("subject", "trial"))
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\pymc\distributions\distribution.py", line 536, in __new__
    rv_out = cls.dist(*args, **kwargs)
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\pymc\distributions\continuous.py", line 508, in dist
    return super().dist([mu, sigma], **kwargs)
  File "C:\Users\mpoul\anaconda3\Lib\site-packages\pymc\distributions\distribution.py", line 618, in dist
    rv_out = cls.rv_op(*dist_params, size=create_size, **kwargs)

HINT: Use the PyTensor flag `exception_verbosity=high` for a debug print-out and storage map footprint of this Apply node.

The error happens with the last model, right?

Which version of PyMC are you using? I’m on 5.16.2

I think the problem here is related to some over-parametrization issue. One needs to think thoroughly because there’s a restriction that the probabilities have to sum to 1 all the time.

I don’t have time to do the math now, but I think it’s possible to do something that reflects what you want. But one needs to be careful about how to write the model.

Chapter 22 of Kruschke’s book may be helpful

Im also on 5.16.2

Perhaps I didn’t copy it well? Here it goes again

import pymc as pm
import numpy as np


n_subjects = 100
n_trials = 100

n_states = 3

subject_noise = 0.5
trial_noise = 0.5

state_probabilities = np.array([0.1, 0.4, 0.5])


coords = {
    "subject": np.arange(n_subjects),
    "trial": np.arange(n_trials),
    "state": np.arange(n_states),
}


with pm.Model(coords=coords) as model:
    initial_log_p = pm.Deterministic(
        "initial_log_p", pm.math.log(state_probabilities), dims="state"
    )

    subj_noise = pm.Normal("subj_noise", 0, subject_noise, dims=("subject"))
    trial_noise = pm.Normal("trial_noise", 0, trial_noise, dims=("subject", "trial"))

    total_noise = pm.Deterministic(
        "total_noise", subj_noise[:, None] + trial_noise, dims=("subject", "trial")
    )

    final_log = pm.Deterministic(
        "final_log", initial_log_p[None, None, :] + total_noise[..., None], dims=("subject", "trial", "state")
    )

    final_p_non_sum = pm.Deterministic(
        "final_p_non_sum", pm.math.softmax(final_log, axis=-1), dims=("subject", "trial", "state")
    )

    dirichlet_sample = pm.Dirichlet(
        "dirichlet_sample", a=final_p_non_sum * 1, dims=("subject", "trial", "state")
    )

    sample_state = pm.Multinomial(
        "sample_state", n=1, p=dirichlet_sample, dims=("subject", "trial", "state")
    )

    prior = pm.sample_prior_predictive(samples=1, random_seed=1)

prior.prior["final_p_non_sum"].sum("state").to_numpy()

This works repeatedly. My main caveat is, at the end, you sample without noise. I think its because adding log odds noise to the log odds of the original probability vector is not symmetrical once you revert to probability space. However, if i sample noise straight in probability space, example pm.Normal("subj_noise", 0, .05, dims=("subject")), for infrequent probabilities, there is a levelling effect where you can only go so low below .1, while you can asymmetrically increase to 1.