I have been working with pymc3 for a while and I was observing the several tutorials with examples. However, I am not sure if I am approaching the Bayesian method correctly. Find below my approach:
from pymc3.distributions import Interpolated
import numpy as np
# import warnings
# import sys
import theano.tensor as tt
from scipy import stats
from pymc3.theanof import floatX
from pymc3.distributions.dist_math import bound
import pymc3 as pm
from pymc3.distributions.continuous import assert_negative_support, get_tau_sigma
from pymc3.distributions.distribution import Continuous, draw_values, generate_samples
THEANO_FLAGS='device=cuda, floatX=float32'
class Hypsecant(Continuous):
"""
Parameters
----------
mu: float
Location parameter.
sigma: float
Scale parameter (sigma > 0). Converges to the standard deviation as nu increases. (only required if lam is not specified)
lam: float
Scale parameter (lam > 0). Converges to the precision as nu increases. (only required if sigma is not specified)
"""
def __init__(self, mu=None, sigma=None, lam=None, sd=None, *args, **kwargs):
# super().__init__(*args, **kwargs)
# super(Hypsecant, self).__init__(*args, **kwargs)
if sd is not None:
sigma = sd
warnings.warn("sd is deprecated, use sigma instead", DeprecationWarning)
lam, sigma = get_tau_sigma(tau=lam, sigma=sigma)
self.lam = lam = tt.as_tensor_variable(lam)
self.sigma = self.sd = sigma = tt.as_tensor_variable(sigma)
self.mean = self.median = self.mode = self.mu = mu = tt.as_tensor_variable(mu)
self.variance = tt.as_tensor_variable(1)
assert_negative_support(mu, 'mu', 'Hypsecant')
assert_negative_support(sigma, 'sigma (lam)', 'Hypsecant')
# return super(Hypsecant, self).__init__(shape=[mu, sigma], *args, **kwargs)
return super().__init__(*args, **kwargs)
def random(self, point=None, size=None):
"""
Draw random values from Hypsecant distribution.
Parameters
----------
point: dict, optional
Dict of variable values on which random values are to be
conditioned (uses default point if not specified).
size: int, optional
Desired size of random sample (returns one sample if not
specified).
Returns
-------
array
"""
# mu = self.mu
# lam = self.lam
# sigma = self.sigma
# # sd = self.sd
# mu = self.mu
# lam, sigma = get_tau_sigma(sigma=sigma)
mu, sigma = draw_values([self.mu, self.sigma], point=point, size=size)
return generate_samples(stats.hypsecant.rvs, loc=mu, scale=sigma, dist_shape=self.shape, size=size)
def logp(self, value):
"""
Calculate log-probability of Hypsecant distribution at specified value.
Parameters
----------
value: numeric
Value(s) for which log-probability is calculated. If the log probabilities for multiple
values are desired the values must be provided in a numpy array or theano tensor
Returns
-------
TensorVariable
"""
# mu = self.mu
# lam = self.lam
sigma = self.sigma
# # sd = self.sd
mu = self.mu
# lam, sigma = get_tau_sigma(sigma=sigma)
Px = pm.math.log(1.0/(math.pi * pm.math.cosh(value)))
return bound(Px)
# return bound(Px, mu, lam, sigma)
def logcdf(self, value):
"""
Compute the log of the cumulative distribution function for Hypsecant distribution
at the specified value.
Parameters
----------
value: numeric
Value(s) for which log CDF is calculated. If the log CDF for multiple
values are desired the values must be provided in a numpy array or theano tensor.
Returns
-------
TensorVariable
"""
# # mu = self.mu
# # lam = self.lam
# # # self.sigma = self.sd = sigma = tt.as_tensor_variable(sigma)
# mu = self.mu
# lam = self.lam
sigma = self.sigma
# # sd = self.sd
mu = self.mu
# lam, sigma = get_tau_sigma(sigma=sigma)
# return bound(pm.math.log(2.0 / math.pi * tt.math.atan(tt.math.exp(value))), mu, lam, sigma)
return bound(pm.math.log(2.0 / math.pi * tt.math.atan(tt.math.exp(value))))
def hypsecan(x,x0,sigma):
y = st.hypsecant.pdf(x,loc=x0,scale=sigma)
return y
def get_hist(data, data_size):
#### General code:
# bins_formulas = ['auto', 'fd', 'scott', 'rice', 'sturges', 'doane', 'sqrt']
# bins = np.histogram_bin_edges(a=data, bins='fd', range=(min(data), max(data)))
# bins = np.histogram_bin_edges(a=data, bins='sturges')
# print('Bin value = ', bins)
# Obtaining the histogram of data:
# Hist, bin_edges = histogram(a=data, bins=bins, range=(min(data), max(data)), density=True)
# bin_mid = (bin_edges + np.roll(bin_edges, -1))[:-1] / 2.0 # go from bin edges to bin middles
# Hist = histogram(a=data, bins=bins, range=(min(data), max(data)), density=True)
# Hist = histogram(a=data, bins=bins, density=True)
# Hist = histogram(a=data, bins=int(data_size), density=True)
Hist = histogram(a=data, density=True)
# return bin_mid
return Hist
def get_best_distribution(data):
## Best distribution from KS test:
# dist_names = ["beta"]
# dist_names = ["bradford"]
# dist_names = ["tukeylambda"]
# dist_names = ["uniform"]
###################################
# dist_names = ["alpha"]
# dist_names = ["gennorm"]
# dist_names = ["cauchy"]
# dist_names = ["genextreme"]
# dist_names = ["logistic"]
# dist_names = ["johnsonsu"]
# dist_names = ["arcsine"]
# dist_names = ["pareto"]
###################################
# Propbable distributions:
# dist_names = ["norm"]
# dist_names = ["laplace"]
# dist_names = ["logistic"]
# dist_names = ["maxwell"]
# dist_names = ["moyal"]
# dist_names = ["rayleigh"]
# dist_names = ["wald"]
dist_names = ["hypsecant"]
dist_results = []
params = {}
for dist_name in dist_names:
dist = getattr(st, dist_name)
param = dist.fit(data)
params[dist_name] = param
# Applying the Kolmogorov-Smirnov test
D, p = st.kstest(data, dist_name, args=param)
print("p value for " + dist_name + " = " + str(p))
dist_results.append((dist_name, p))
# select the best fitted distribution
best_dist, best_p = (max(dist_results, key=lambda item: item[1]))
# store the name of the best fit and its p value
print("Best fitting distribution: " + str(best_dist))
print("Best p value: " + str(best_p))
print("Parameters for the best fit: " + str(params[best_dist]))
return best_dist, best_p, params[best_dist]
def make_pdf(data, dist, params, size):
"""Generate distributions's Probability Distribution Function """
# Get histogram of original data
Hist_list = get_hist(data, size)
Hist_data, bin_edges = Hist_list
bin_mid = (bin_edges + np.roll(bin_edges, -1))[:-1] / 2.0 # go from bin edges to bin middles
# Separate parts of parameters
arg = params[:-2]
loc = params[-2]
scale = params[-1]
# Get sane start and end points of distribution
start = dist.ppf(0.01, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.01, loc=loc, scale=scale)
end = dist.ppf(0.99, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.99, loc=loc, scale=scale)
# # Build PDF and turn into pandas Series
# size = int(size)
# x = np.linspace(start, end, size)
# y = dist.pdf(x, loc=loc, scale=scale, *arg)
# scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz(y, x)
# y *= scale_pdf
# pdf = pd.Series(y, x)
# return pdf, x, y
# # Build PDF and turn into pandas Series
# size = int(size)
# x = bin_edges
# # x = bin_mid
# y = dist.pdf(x, loc=loc, scale=scale, *arg)
# print(y)
# scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz (y, x)
# y *= scale_pdf
# pdf = pd.Series(y, x)
# return pdf, x, y
# # Build PDF and turn into pandas Series
# size = int(size)
# x_1 = bin_edges
# y_1 = dist.pdf(x_1, loc=loc, scale=scale, *arg)
# # scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz(y_1, x_1)
# # y_1 *= scale_pdf
# # Fitting the distribution:
# param = dist.fit(data)
# # Separate parts of parameters
# arg_2 = param[:-2]
# loc_2 = param[-2]
# scale_2 = param[-1]
# # Get sane start and end points of distribution
# start_2 = dist.ppf(0.01, *arg_2, loc=loc_2, scale=scale_2) if arg_2 else dist.ppf(0.01, loc=loc_2, scale=scale_2)
# end_2 = dist.ppf(0.99, *arg_2, loc=loc_2, scale=scale_2) if arg_2 else dist.ppf(0.99, loc=loc_2, scale=scale_2)
# # Build PDF and turn into pandas Series
# x_2 = np.linspace(start_2, end_2, size)
# y_2 = dist.pdf(x_2, loc=loc_2, scale=scale_2, *arg_2)
# scale_pdf_2 = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz(y_2, x_2)
# y_2 *= scale_pdf_2
# pdf_2 = pd.Series(y_2, x_2)
# return pdf_2, x_2, y_2
# # Build PDF and turn into pandas Series
# size = int(size)
# x = np.linspace(start, end, size)
# y = dist.pdf(bin_edges, loc=loc, scale=scale, *arg)
# # scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz(y, x)
# scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz(y, bin_edges)
# y *= scale_pdf
# pdf = pd.Series(y, x)
# return pdf, x, y
# ## Fitting the curve:
# # Build PDF and turn into pandas Series
# size = int(size)
# x = bin_edges
# y = dist.pdf(x, loc=loc, scale=scale, *arg)
# scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz (y, x)
# y *= scale_pdf
# # Getting mean and sigma of Gaussain distribution:
# # mean = sum(x*y) / sum(y)
# # sigma = np.sqrt(sum(y * (x - mean)**2) / sum(y))
# mean = np.mean(y)
# sigma = np.sqrt(np.var(y))
# # Fitting the curve using curve fit:
# print('Implement fitting a curve')
# # popt, pcov = curve_fit(hypsecan, x, y, p0=[max(y), mean, sigma])
# # y = gaus(x, *popt)
# # popt, pcov = curve_fit((lambda x, mu, sig: dist.pdf(x, loc=mu, scale=sig)), x, y, p0=[max(y), mean, sigma])
# # y = dist.pdf(x, *popt)
# popt, pcov = curve_fit(hypsecan, x, y, p0=[max(y), loc, scale])
# x = np.linspace(start, end, size)
# y = gaus(x, *popt)
# scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz (y, x)
# y *= scale_pdf
# pdf = pd.Series(y, x)
# return pdf, x, y
## Fitting the curve:
# Build PDF and turn into pandas Series
size = int(size)
x = bin_edges
y = dist.pdf(x, loc=loc, scale=scale, *arg)
scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz (y, x)
y *= scale_pdf
# Getting mean and sigma of Gaussain distribution:
param = dist.fit(x)
# Separate parts of parameters
arg_2 = param[:-2]
loc_2 = param[-2]
scale_2 = param[-1]
# Get sane start and end points of distribution
start = dist.ppf(0.01, *arg, loc=loc_2, scale=scale_2) if arg_2 else dist.ppf(0.01, loc=loc_2, scale=scale_2)
end = dist.ppf(0.99, *arg, loc=loc_2, scale=scale_2) if arg_2 else dist.ppf(0.99, loc=loc_2, scale=scale_2)
# Fitting the curve using curve fit:
print('Implement fitting a curve')
popt, pcov = curve_fit(hypsecan, x, y, p0=[loc, scale])
x = np.linspace(start, end, size)
y = hypsecan(x, *popt)
scale_pdf = np.trapz(Hist_list[0], (Hist_list[1] + np.roll(Hist_list[1], -1))[:-1] / 2.0) / np.trapz (y, x)
y *= scale_pdf
pdf = pd.Series(y, x)
return pdf, x, y
def pdf_Bayesian_Interface(data):
########################################################################################################################
with pm.Model() as model:
# Prior Distributions for unknown model parameters:
# nu = pm.HalfNormal('nu', sd=1)
sigma = pm.HalfNormal('sigma', sd=5)
mu = pm.Normal('mu', mu=0, sd=5)
# pm.Normal()
# Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
observed_data = Hypsecant('observed_data', mu=mu, sigma=sigma, observed=data)
# obtain starting values via MAP
# startvals = pm.find_MAP(model=model)
# instantiate sampler
# step = pm.Metropolis() ## Best one
# step = pm.HamiltonianMC()
# step = pm.NUTS()
# step = pm.sample_smc(n_steps=10, cores=2,progressbar=True)
# Printing the result of log_likelihood:
# print('log_likelihood result:', model)
# draw 5000 posterior samples
trace = pm.sample(draws=1000, tune=500, chains=3, cores=1, progressbar=True)
# trace = pm.sample(start=startvals, draws=1500, step=step, tune=500, chains=3, cores=1, progressbar=True)
# Obtaining Posterior Predictive Sampling:
post_pred = pm.sample_posterior_predictive(trace, samples=1000)
# post_pred = pm.sample_posterior_predictive(trace, samples=1500)
print(post_pred['observed_data'].shape)
print('\nSummary: ')
print(pm.stats.summary(data=trace))
print(pm.stats.summary(data=post_pred))
########################################################################################################################
return trace, post_pred
######## Main Program:
print('#######################################################################################################')
print('Accessing Bayesian PDF for input data:')
trace, post_pred = pdf_Bayesian_Interface(data=pdfdata)
best_dist = getattr(st, 'hypsecant')
param = [np.mean(trace['mu']), np.mean(trace['sigma'])]
print('Parameters values = ', param)
pdf, x_axis_pdf, y_axis_pdf = make_pdf(data=all_peaks, dist=best_dist, params=param, size=len(trace))
print('#######################################################################################################')
I have the following questions:
- Is my approach is correct?
- I had a question with regards about the
Bayesian updating. Question can be found in here Bayesian fusion
question and Bayesian Fusion question Bayesian updating , is there a way to extract parameters to use as priors? - How to extract parameters to use as postierior distribution? (Using code above)
- Is my code for the custom distribution part done correctly?