I am using a prophet-like model for time series forecasting. Just like fb prophet, it takes time t as the only predictor and models the time series with a GAM, breaking down the time series in a trend and seasonality components (y(t) = trend(t) + weekly_seasonality(t) + yearly_seasonality(t) + noise). My implementation is given below (notice that I am passing an old_trace parameter to my class that determines if I will be creating my priors from scratch or I will create the using the prior_from_idata function from pymc-experimental).
NANOSECONDS_TO_SECONDS = 1000 * 1000 * 1000
class MyProphet():
def __init__(self,
n_changepoints=25,
changepoints_prior_scale=0.05,
changepoint_range=0.8,
seasonality_prior_scale=10.0,
old_trace=None,
):
self.n_changepoints = n_changepoints
self.changepoints_prior_scale = changepoints_prior_scale
self.changepoint_range = changepoint_range
self.seasonality_prior_scale = seasonality_prior_scale
self.old_trace = old_trace
self.model = None
self.trace = None
self.data = None
self.y_min = None
self.y_max = None
self.ds_min = None
self.ds_max = None
def _scale_data(self):
self.y_min = 0
self.y_max = self.data["y"].abs().max()
self.ds_min = self.data["ds"].min()
self.ds_max = self.data["ds"].max()
self.data["y"] = self.data["y"] / self.y_max
self.data["t"] = (self.data["ds"] - self.ds_min) / (self.ds_max - self.ds_min)
def _process_data(self):
self.data["ds"] = pd.to_datetime(self.data["ds"])
self.data.sort_values("ds", inplace=True)
self._scale_data()
def _add_trend(self, priors):
t = np.array(self.data["t"])
hist_size = int(np.floor(self.data.shape[0] * self.changepoint_range))
cp_indexes = np.linspace(0, hist_size - 1, self.n_changepoints + 1).round().astype(int)
s = np.array(self.data.iloc[cp_indexes]["t"].tail(-1))
# * 1 casts the boolean to integers
A = (t[:, None] > s) * 1
with self.model:
# initial growth
if self.old_trace is None:
k = pm.Normal("k", self.k_mean , self.k_sd)
else:
k = priors["k"]
changepoints_prior_scale = self.changepoints_prior_scale
if self.changepoints_prior_scale is None:
if self.old_trace is None:
changepoints_prior_scale = pm.Exponential("tau", 1.5)
else:
changepoints_prior_scale = priors["tau"]
# rate of change
if self.old_trace is None:
delta = pm.Laplace("delta", self.delta_mean, changepoints_prior_scale, shape=self.n_changepoints)
else:
delta = priors["delta"]
# offset
if self.old_trace is None:
m = pm.Normal("m", self.m_mean, self.m_sd)
else:
m = priors["m"]
gamma = -s * delta
trend = pm.Deterministic("trend", (k + pyt.tensor.dot(A, delta)) * t + (m + pyt.tensor.dot(A, gamma)))
return trend, A, s
def _fourier_series(self, data, period=365.25, series_order=10,):
# convert to days since epoch
t = data["ds"].to_numpy(dtype=np.int64) // NANOSECONDS_TO_SECONDS / (3600 * 24.)
x_T = t * np.pi * 2
fourier_components = np.empty((data["ds"].shape[0], 2 * series_order))
for i in range(series_order):
c = x_T * (i + 1) / period
fourier_components[:, 2 * i] = np.sin(c)
fourier_components[:, (2 * i) + 1] = np.cos(c)
return fourier_components
def _get_seasonality_n_x(self, data, period):
if period == "yearly":
n = 10
p = 365.25
else: # weekly
n = 3
p = 7
return n, self._fourier_series(data, p, n)
def _add_seasonality(self, period, mean, priors):
n, x = self._get_seasonality_n_x(self.data, period)
with self.model:
if self.old_trace is None:
beta = pm.Normal(f"beta_{period}", mu=mean, sigma=self.seasonality_prior_scale, shape=2 * n)
else:
beta = priors[f"beta_{period}"]
return x, beta
def fit(self, data):
self.data = data.reset_index(drop=True)
self._process_data()
self.model = pm.Model()
with self.model:
priors = None
if self.old_trace is not None:
priors = pmx.utils.prior.prior_from_idata(
self.old_trace,
var_names=["k", "m", "delta", "beta_yearly", "beta_weekly"] + (["tau"] if self.changepoints_prior_scale is None else [])
)
trend, A, s = self._add_trend(priors)
x_yearly, beta_yearly = self._add_seasonality("yearly", self.beta_yearly_mean, priors)
x_weekly, beta_weekly = self._add_seasonality("weekly", self.beta_weekly_mean, priors)
trend += pyt.tensor.dot(x_yearly, beta_yearly) + pyt.tensor.dot(x_weekly, beta_weekly)
sigma = pm.HalfNormal("sigma", self.sigma)
obs = pm.Normal("obs", mu=trend, sigma=sigma, observed=self.data["y"])
self.trace = pm.sample(self.mcmc_samples, return_inferencedata=True)
I am trying to fit a model without passing an old_trace. After that, I an trying to create a new model passing an old_trace, which for my code means that instead of specifying the shapes and parameters for the priors, I will be creating them using the function prior_from_idata. What I am trying to do looks something like this:
df = ... # I am loading a pandas dataframe from a csv
model = MyProphet()
model.fit(data)
next_model = MyProphet(old_trace=model.trace)
next_model.fit()
What I noticed is that, while sampling, next_model is using way less parameters than model (model is using all the priors that I have set manually, whereas next_model treats my priors that I have created with the prior_from_idata as deterministic variables and only samples the multivariate normal approximation).
To compare the sampling times, on my data, model takes around 5 mins to finish, and next_model takes around 15 seconds.
This is the graphical representation of model:
And this is the graphical representation of next_model:
Is this the way that prior_from_idata is supposed to be used? I was under the impression that the priors obtained with prior_from_idata will be free random variables, and not deterministic, but the way I am using it creates deterministic variables. If I am doing something wrong, can somebody point me to some example of how prior_from_idata is supposed to be used?
I was also recreating the updating priors example and my priors there were free random variables, even tough I obtained them from the posterior using kernel density estimation and the Interpolated class from pymc.