I am trying to fit a gaussian random walk to an monthly trend of daily data and am wondering how to broadcast the random walk distribution to the daily data.
Look at the below example which will throw an error at:
mu = (math.e**slopes) * x since the shapes does not match
How should one go about this? i assume if i have the answer to this question i will understand to do it on an AR distribution aswell which is what i am really looking for.
def supersimplegeneratorv3(n=365):
seed = 42
np.random.seed(seed)
def applytrend(arr, indexes, months, trendvals):
for month in months:
val = trendvals[month]
mask = indexes.month == month
indexes_ = np.nonzero(mask)[0]
for idx in indexes_:
arr[idx] = arr[idx] * val
return arr
data = pd.DataFrame({
"date": pd.date_range(start="1/1/2018", periods=n),
"x1": 0 + np.abs(np.random.normal(400, 200, size=n)),
})
data.index = data["date"]
months = set(data.index.month.tolist())
trendfa = [np.random.normal(0, 0.2) for _ in months]
trendvalsfa = {month: val for month, val in zip(months, trendfa)}
fa = data["x1"].values
processedvalsfa = applytrend(arr=fa,
indexes=data.index,
months=months,
trendvals={val: math.e ** x for val, x in trendvalsfa.items()})
noise_scale = 10
noise = np.random.default_rng(seed).normal(0, noise_scale, n)
response = processedvalsfa + noise
response = np.where(response < 0, 0, response)
df = pd.DataFrame.from_dict(data={"x1": data["x1"], "response": response, "date": data["date"]})
df.index = df["date"]
return df
def gaussrandomwalkmodel(frame):
with pm.Model() as model_2:
x = frame["x1"].values
months = math.floor(frame.shape[0] / 30)
sigma_slope = pm.HalfNormal(name="sigma_slope", sigma=0.05)
slopes = pm.GaussianRandomWalk(
name="slopes",
sigma=sigma_slope,
init=pm.distributions.continuous.Normal.dist(
mu=0, sigma=2
),
shape=(months,)
)
mu = (math.e**slopes) * x
sigma = pm.HalfNormal(name="sigma", sigma=10)
nu = pm.Gamma(name="nu", alpha=25, beta=2)
likelihood = pm.StudentT(name="outcome",
nu=nu,
mu=mu,
sigma=sigma,
observed=frame["response"]
)
with model_2:
map_ = pm.find_MAP()
trace = pm.sample(1000,
tune=1000,
cores=4,
return_inferencedata=True)
var_names = [x for x in trace.posterior
if "trend" in x]
az.plot_trace(trace, var_names=var_names, legend=False, figsize=(24, len(var_names) * 4))
az.plot_posterior(trace, var_names=var_names)
def runsupersimplemodelv3():
df = supersimplegeneratorv3(n=300)
gaussrandomwalkmodel(frame=df)
if __name__ == "__main__":
runsupersimplemodelv3()