For anyone coming to this later, I have reworked the tutorial in ArviZ documentation a bit (@OriolAbril) so that it is done directly using the model and not having to reconstruct the likelihood. I think it would not be possible to make it completely general however this is pretty close and should be just a few changes to make it work on any other model. Also @Simon,
I think there are some other problems in the example given in the link. For instance here
def log_likelihood__i(self, excluded_observed_data, idata__i):
log_lik__i = idata.log_likelihood['y']
return log_lik__i
What you should really be returning is the likelihood of excluded observable given the parameters from the posterior trained on non excluded observables (like in Arviz guide page). But this is just returning the log_likelihood from the original idata (not even something depending on idata__i would be correct here, since it would be log likelihood of non-excluded observables which is not what you want if I am not misunderstanding reloo). In any case, the code for the linear regression example is below, I have checked it against the example there and it does exactly the same thing.
import arviz as az
import pymc as pm
import numpy as np
import pytensor.tensor as pt
from xarray import DataArray
class PyMCLinRegWrapper(az.PyMCSamplingWrapper):
def sample(self, modified_observed_data):
with self.model:
# if the model had coords the dim needs to be updated before
# modifying the data in the model with set_data
# otherwise, we don't need to overwrite the sample method
n__i = len(modified_observed_data["x"])
self.model.set_dim("time", n__i, coord_values=np.arange(n__i))
pm.set_data(modified_observed_data)
idata = pm.sample(
**self.sample_kwargs,
return_inferencedata=True,
idata_kwargs={"log_likelihood": True}
)
return idata
def log_likelihood__i(self, excluded_observed_data, idata__i):
# get posterior trained on non excluded observables
post = idata__i.posterior
# create a model where x is set to
# excluded x. value of y does not really matter
# but it is also set.
_model = linreg_model()
_model.set_dim("time", excluded_observed_data["x"].size,
coord_values=excluded_observed_data["x"])
pm.set_data(excluded_observed_data, model=_model)
# define the function log likelihood(obs | param )
value = pt.tensor("value", dtype=float, shape=())
logp_fun = pm.compile_pymc([value, _model.b0, _model.b1, _model.sigma_e],
pm.logp(_model.y, value))
logp_fun = np.vectorize(logp_fun)
# return log likelihood (excluded obs | params trained on non excluded)
return DataArray(logp_fun(value=excluded_observed_data["y_obs"],
b0=post["b0"], b1=post["b1"], sigma_e=post["sigma_e"]))
def sel_observations(self, idx):
xdata = self.idata_orig["constant_data"]["x"]
ydata = self.idata_orig["observed_data"]["y"]
mask = np.isin(np.arange(len(xdata)), idx)
data_dict = {"x": xdata, "y_obs": ydata}
data__i = {key: value.values[~mask] for key, value in data_dict.items()}
data_ex = {key: value.isel(time=idx) for key, value in data_dict.items()}
return data__i, data_ex
def linreg_model():
coords = {"time":[]}
x = []
y = []
#dont forget to set data and coords
with pm.Model(coords=coords) as linreg_model:
x = pm.Data("x", x, dims="time")
y_obs = pm.Data("y_obs", y, dims="time")
b0 = pm.Normal("b0", 0, 10)
b1 = pm.Normal("b1", 0, 10)
sigma_e = pm.HalfNormal("sigma_e", 10)
pm.Normal("y", b0 + b1 * x, sigma_e, observed=y_obs, dims="time")
return linreg_model
sample_kwargs = {"chains": 4, "draws": 500}
seed = 4
rng = np.random.default_rng(seed)
xdata = np.linspace(0, 50, 100)
b0, b1, sigma = -2, 1, 3
ydata = rng.normal(loc=b1 * xdata + b0, scale=sigma)
ydata[50] += 50 #modify an obs so it has high pareto_k
coords = {
"time": np.arange(xdata.size)
}
with linreg_model() as model:
model.set_dim("time", xdata.size, coord_values=xdata)
pm.set_data({"x":xdata, "y_obs":ydata})
idata = pm.sample(**sample_kwargs)
pm.compute_log_likelihood(idata, extend_inferencedata=True)
loo_orig = az.loo(idata, pointwise=True)
print(loo_orig)