In any case, @Ray_Kruger this is a proof of concept you can take a look:
Say you have model at first level as:
mu0, sigma0 = 10., 1.5
y0 = np.random.normal(mu0, sigma0, size=1000)
with pm.Model() as m:
mu = pm.Normal('mu', 0, 100)
sigma = pm.HalfNormal('sigma', 10)
y = pm.Normal('y', mu, sigma, observed=y0)
trace = pm.sample(1000)
and another model like:
beta0 = 2.
sigma_new = 4.
X = np.linspace(-10, 10, 100)
y1 = np.random.normal(X * beta0, sigma_new)
with pm.Model() as m2:
beta = pm.Normal('beta', 0, 100)
sigma = pm.HalfNormal('sigma', 10)
y = pm.Normal('y', X * beta, sigma, observed=y1)
To have sigma
fixed in m2
using the posterior from m1
, you can do:
from pymc.step_methods.arraystep import ArrayStepShared
from pymc.blocking import RaveledVars
class NormalProposal:
def __init__(self, loc, scale):
self.loc = loc
self.scale = scale
def __call__(self, rng=None, size=()):
if rng is None:
rng = np.random
return rng.normal(self.loc, scale=self.scale, size=size)
class FixedDistSample(ArrayStepShared):
"""Return sample from a fixed proposal distribution.
"""
name = "fixed_dist_sample"
generates_stats = False
def __init__(self, vars, proposal_kwarg_dict, model=None):
model = pm.modelcontext(model)
initial_values = model.initial_point()
vars = [model.rvs_to_values.get(var, var) for var in vars]
vars = pm.inputvars(vars)
initial_values_shape = [initial_values[v.name].shape for v in vars]
self.ndim = int(sum(np.prod(ivs) for ivs in initial_values_shape))
self.proposal_dist = NormalProposal(**proposal_kwarg_dict)
shared = pm.make_shared_replacements(initial_values, vars, model)
super().__init__(vars, shared)
def astep(self, q0: RaveledVars) -> RaveledVars:
point_map_info = q0.point_map_info
q0 = q0.data
q = self.proposal_dist(size=self.ndim)
return RaveledVars(q, point_map_info)
from scipy.stats import t, norm
sigma_log__ = m.rvs_to_values[m.free_RVs[1]].tag.transform.forward(np.ravel(trace.posterior['sigma'].values))
sigma_log__ = aesara.function([], sigma_log__)()
# df, loc, scale = t.fit(sigma_log__)
loc, scale = norm.fit(sigma_log__)
with m2:
fixed_step = FixedDistSample([sigma], {'loc': loc, 'scale': scale})
trace3 = pm.sample(1000, step=fixed_step)
az.plot_trace(trace3);
You will see that sigma in trace3
follow the posterior from m
instead of m2
.
You can find the complete notebook here: link