Creating a Data Fusion with PyMC3

I have been looking into data fusion methods and what caught my eyes is the idea of using Kalman filter which looks into fusion data based on mean and variance of Gaussian distribution and implements the prediction and correction from weak sensor to stronger/more accurate sensor. I looked into the following GitHub links to get a further understanding of fusion:

  1. Kalman and Bayesian Filters in Python
  2. Kalman Filter in 1 Dimension

So, in the first link, I found they were talking about the discrete Bayesian filter, but, they didn’t mention about the continuous Bayesian filter. So, I thought to do the same steps with the idea from Kalman filter to implement a continuous Bayesian filter with the help of PyMC3 package. The steps involved can be found in the second link and code is below.

mean = mean0
var = var0

# Predict
var, mean = predict(var, mean, varMove, distances[m])
#print('mean: %.2f\tvar:%.2f' % (mean, var))
plt.plot(x,mlab.normpdf(x, mean, var), label='%i. step (Prediction)' % (m+1))
        
# Correct
var, mean = correct(var, mean, varSensor, positions[m])
print('After correction:  mean= %.2f\tvar= %.2f' % (mean, var))
plt.plot(x,mlab.normpdf(x, mean, var), label='%i. step (Correction)' % (m+1))

However, I am not sure if I am doing the steps correctly. So, from the data that I am using, I implemented the following link,

Fitting empirical distribution to theoretical ones with Scipy

from the link above and distribution list, Student T distribution fits with my data. To implement the data fusion idea is to be able to get the initial data and then use the parameters from the first model and implement them into the prediction models and collect parameters from prediction model to correction model. Here is my approach to the fusion,

def S2_0_Bayesian_Interface(data):
########################################################################################################################
    with pm.Model() as model_0:
            # Prior Distributions for unknown model parameters:
            nu_0 = pm.HalfNormal('nu_0', sigma=10)
            sigma_0 = pm.HalfNormal('sigma_0', sigma=10)
            mu_0 = pm.Normal('mu_0', mu=0, sigma=10)

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_0 = pm.StudentT('observed_data_0', nu=nu_0, mu=mu_0, sigma=sigma_0, observed=data)
    
            # draw 5000 posterior samples
            trace_0 = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_0 = pm.sample_posterior_predictive(trace_0, samples=3000)
            print(post_pred_0['observed_data_0'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_0))
            print(pm.stats.summary(data=post_pred_0))
########################################################################################################################
    return trace_0, post_pred_0

def S2_P_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_P:
            # Prior Distributions for unknown model parameters:
            nu_P = pm.HalfNormal('nu_P', sigma=sigma)
            sigma_P = pm.HalfNormal('sigma_P', sigma=sigma)
            mu_P = pm.Normal('mu_P', mu=mu, sigma=sigma)

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_P = pm.StudentT('observed_data_P', nu=nu_P, mu=mu_P, sigma=sigma_P, observed=data)
    
            # draw 5000 posterior samples
            trace_P = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_P = pm.sample_posterior_predictive(trace_P, samples=3000)
            print(post_pred_P['observed_data_P'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_P))
            print(pm.stats.summary(data=post_pred_P))
########################################################################################################################
    return trace_P, post_pred_P

def S1_C_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_C:
            # Prior Distributions for unknown model parameters:
            nu_C = pm.HalfNormal('nu_C', sigma=sigma)
            sigma_C = pm.HalfNormal('sigma_C', sigma=sigma)
            mu_C = pm.Normal('mu_C', mu=mu, sigma=sigma)

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_C = pm.StudentT('observed_data_C', nu=nu_C, mu=mu_C, sigma=sigma_C, observed=data)
    
            # draw 5000 posterior samples
            trace_C = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_C = pm.sample_posterior_predictive(trace_C, samples=3000)
            print(post_pred_C['observed_data_C'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_C))
            print(pm.stats.summary(data=post_pred_C))
########################################################################################################################
    return trace_C, post_pred_C

for m in range(len(S2_data_list)):
    print('\n')
    print('Acessing list number: ', m)
    # print(S2_data_list)
    # print(len(S2_data_list))
    # Initializing the PDF Distribution:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for initial Sensor 2:')
    trace_S2_0, post_pred_S2_0 = S2_0_Bayesian_Interface(data=S2_data_list[0])
    best_dist = getattr(st, 't')
    param_S2_0 = [np.mean(trace_S2_0['nu_0']), np.mean(trace_S2_0['mu_0']), np.mean(trace_S2_0['sigma_0'])]
    print('Parameters values = ', param_S2_0)
    pdf_S2_0, B_x_axis_pdf_S2_0, y_axis_pdf_S2_0 = make_pdf(data=S2_data_list[0], dist=best_dist, params=param_S2_0, size=len(trace_S2_0))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S2_0, y_axis_pdf_S2_0, label='%i. step (Initial[S2])' % (m + 1))

    # Predict:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for predictive Sensor 2:')
    trace_S2_P, post_pred_S2_P = S2_P_Bayesian_Interface(nu=np.mean(trace_S2_0['nu_0']), mu=np.mean(trace_S2_0['mu_0']), sigma=np.mean(trace_S2_0['sigma_0']), data=S2_data_list[m])
    best_dist = getattr(st, 't')
    param_S2_P = [np.mean(trace_S2_P['nu_P']), np.mean(trace_S2_P['mu_P']), np.mean(trace_S2_P['sigma_P'])]
    print('Parameters values = ', param_S2_P)
    pdf_S2_P, B_x_axis_pdf_S2_P, y_axis_pdf_S2_P = make_pdf(data=S2_data_list[m], dist=best_dist, params=param_S2_P, size=len(trace_S2_P))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S2_P, y_axis_pdf_S2_P, label='%i. step (Prediction[S2])' % (m + 1))

    # Correct:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for correction Sensor 1:')
    trace_S1_C, post_pred_S1_C = S1_C_Bayesian_Interface(nu=np.mean(trace_S2_P['nu_P']), mu=np.mean(trace_S2_P['mu_P']), sigma=np.mean(trace_S2_P['sigma_P']), data=S1_data_list[m])
    best_dist = getattr(st, 't')
    param_S1_C = [np.mean(trace_S1_C['nu_C']), np.mean(trace_S1_C['mu_C']), np.mean(trace_S1_C['sigma_C'])]
    print('Parameters values = ', param_S1_C)
    pdf_S1_C, B_x_axis_pdf_S1_C, y_axis_pdf_S1_C = make_pdf(data=S1_data_list[m], dist=best_dist, params=param_S1_C, size=len(trace_S1_C))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S1_C, y_axis_pdf_S1_C, label='%i. step (Correction[S1])' % (m + 1))
    corr_pdf_S1 = y_axis_pdf_S1_C
    corr_pdf_S1_list.append(corr_pdf_S1)
    print('Corrected PDF: ', corr_pdf_S1)
    print('Length of Corrected PDF list = ', len(corr_pdf_S1_list))
    x_corr_pdf_list.append(B_x_axis_pdf_S1_C)

Am I approaching the idea correctly?

Using the posterior mean to initialized the model for the next step should work in principle if the posterior is Gaussian like, which in this case, you would have some problem for parameter like nu and sigma. I dont have a out-of-the-box solution that I can recommend to you right now, as this is pretty complicated and depending highly on your model and data. But I would recommend either using standard Kalman filter, or looking at particle filtering like approach.

Did you mean that I would need to create equations in order to update the nu and sigma parameters? Also, how can I modify my model/code in order to work with for particle filter approach?

I was talking to someone about this concept and he mentioned doing bayesian updating. How can I implement this way into my code and using your package?

I was searching online and would I need to modify the functions similar to the question in stackflow?

Updating model on PyMC3 with new observed data

You can have a look at the discussion here: Performance speedup for updating posterior with new data

Sorry for the late reply, I have been working on the data fusion for some time using a Bayesian filter. I am not sure if I am using Bayesian updating right, but, I got a function from the following link:

Updating model on PyMC3 with new observed data

However, when it reaches a stage of extracting posterior information from initial data, I keep on getting error talking about an unknown variable. I have modified my code to the following:

def from_posterior(param, samples):
    # Assigning linspace variable:
    smin, smax = np.min(samples), np.max(samples)
    width = smax - smin
    x = np.linspace(smin, smax, 100)
    
    # Creating Gaussain KDE:
    y = st.gaussian_kde(samples)(x)
    
    # what was never sampled should have a small probability but not 0,
    # so we'll extend the domain and use linear approximation of density on it
    x = np.concatenate([[x[0] - 3 * width], x, [x[-1] + 3 * width]])
    y = np.concatenate([[0], y, [0]])
    return Interpolated(param, x, y)

def S2_0_Bayesian_Interface(data):
########################################################################################################################
    with pm.Model() as model_0:
            # Prior Distributions for unknown model parameters:
            nu_0 = pm.HalfNormal('nu_0', sigma=10)
            sigma_0 = pm.HalfNormal('sigma_0', sigma=10)
            mu_0 = pm.Normal('mu_0', mu=0, sigma=10)

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_0 = pm.StudentT('observed_data_0', nu=nu_0, mu=mu_0, sigma=sigma_0, observed=data)
    
            # draw 5000 posterior samples
            trace_0 = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_0 = pm.sample_posterior_predictive(trace_0, samples=3000)
            print(post_pred_0['observed_data_0'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_0))
            print(pm.stats.summary(data=post_pred_0))
########################################################################################################################
    return trace_0, post_pred_0

def S2_P_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_P:
            # Prior Distributions for unknown model parameters:
            nu_P = from_posterior('nu_0', trace['nu_0'])
            sigma_P = from_posterior('sigma_0', trace['sigma_0'])
            mu_P = from_posterior('mu_0', trace['mu_0'])

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_P = pm.StudentT('observed_data_P', nu=nu_P, mu=mu_P, sigma=sigma_P, observed=data)
    
            # draw 5000 posterior samples
            trace_P = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_P = pm.sample_posterior_predictive(trace_P, samples=3000)
            print(post_pred_P['observed_data_P'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_P))
            print(pm.stats.summary(data=post_pred_P))
########################################################################################################################
    return trace_P, post_pred_P

def S1_C_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_C:
            # Prior Distributions for unknown model parameters:
            nu_C = from_posterior('nu_P', trace['nu_P'])
            sigma_C = from_posterior('sigma_P', trace['sigma_P'])
            mu_C = from_posterior('mu_P', trace['mu_P'])

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_C = pm.StudentT('observed_data_C', nu=nu_C, mu=mu_C, sigma=sigma_C, observed=data)
    
            # draw 5000 posterior samples
            trace_C = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_C = pm.sample_posterior_predictive(trace_C, samples=3000)
            print(post_pred_C['observed_data_C'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_C))
            print(pm.stats.summary(data=post_pred_C))
########################################################################################################################
    return trace_C, post_pred_C

for m in range(len(S2_data_list)):
    print('\n')
    print('Acessing list number: ', m)
    # print(S2_data_list)
    # print(len(S2_data_list))
    # Initializing the PDF Distribution:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for initial Sensor 2:')
    trace_S2_0, post_pred_S2_0 = S2_0_Bayesian_Interface(data=S2_data_list[0])
    best_dist = getattr(st, 't')
    param_S2_0 = [np.mean(trace_S2_0['nu_0']), np.mean(trace_S2_0['mu_0']), np.mean(trace_S2_0['sigma_0'])]
    print('Parameters values = ', param_S2_0)
    pdf_S2_0, B_x_axis_pdf_S2_0, y_axis_pdf_S2_0 = make_pdf(data=S2_data_list[0], dist=best_dist, params=param_S2_0, size=len(trace_S2_0))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S2_0, y_axis_pdf_S2_0, label='%i. step (Initial[S2])' % (m + 1))

    # Predict:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for predictive Sensor 2:')
    trace_S2_P, post_pred_S2_P = S2_P_Bayesian_Interface(nu=np.mean(trace_S2_0['nu_0']), mu=np.mean(trace_S2_0['mu_0']), sigma=np.mean(trace_S2_0['sigma_0']), data=S2_data_list[m])
    best_dist = getattr(st, 't')
    param_S2_P = [np.mean(trace_S2_P['nu_P']), np.mean(trace_S2_P['mu_P']), np.mean(trace_S2_P['sigma_P'])]
    print('Parameters values = ', param_S2_P)
    pdf_S2_P, B_x_axis_pdf_S2_P, y_axis_pdf_S2_P = make_pdf(data=S2_data_list[m], dist=best_dist, params=param_S2_P, size=len(trace_S2_P))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S2_P, y_axis_pdf_S2_P, label='%i. step (Prediction[S2])' % (m + 1))

    # Correct:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for correction Sensor 1:')
    trace_S1_C, post_pred_S1_C = S1_C_Bayesian_Interface(nu=np.mean(trace_S2_P['nu_P']), mu=np.mean(trace_S2_P['mu_P']), sigma=np.mean(trace_S2_P['sigma_P']), data=S1_data_list[m])
    best_dist = getattr(st, 't')
    param_S1_C = [np.mean(trace_S1_C['nu_C']), np.mean(trace_S1_C['mu_C']), np.mean(trace_S1_C['sigma_C'])]
    print('Parameters values = ', param_S1_C)
    pdf_S1_C, B_x_axis_pdf_S1_C, y_axis_pdf_S1_C = make_pdf(data=S1_data_list[m], dist=best_dist, params=param_S1_C, size=len(trace_S1_C))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S1_C, y_axis_pdf_S1_C, label='%i. step (Correction[S1])' % (m + 1))
    corr_pdf_S1 = y_axis_pdf_S1_C
    corr_pdf_S1_list.append(corr_pdf_S1)
    print('Corrected PDF: ', corr_pdf_S1)
    print('Length of Corrected PDF list = ', len(corr_pdf_S1_list))
    x_corr_pdf_list.append(B_x_axis_pdf_S1_C)

When it gets to the predicting stage, I keep on getting:

Unknown variable nu_0

Also, side question:

How can I speed up the sampling for posterior? Because when I run the code sometimes it takes around 20sec (If I remember it correctly :sweat_smile:), but, now when I run the code it takes a long time to obtain posterior distribution like around 300 seconds.

I have been working on data fusion for some time using a Bayesian filter. I am not sure if I am using Bayesian updating right, but, I got a function from the following link:

Updating model on PyMC3 with new observed data

However, when it reaches the stage of extracting posterior information from initial data, I keep on getting an error talking about an unknown variable. I have modified my code to the following:

def from_posterior(param, samples):
    # Assigning linspace variable:
    smin, smax = np.min(samples), np.max(samples)
    width = smax - smin
    x = np.linspace(smin, smax, 100)
    
    # Creating Gaussain KDE:
    y = st.gaussian_kde(samples)(x)
    
    # what was never sampled should have a small probability but not 0,
    # so we'll extend the domain and use linear approximation of density on it
    x = np.concatenate([[x[0] - 3 * width], x, [x[-1] + 3 * width]])
    y = np.concatenate([[0], y, [0]])
    return Interpolated(param, x, y)

def S2_0_Bayesian_Interface(data):
########################################################################################################################
    with pm.Model() as model_0:
            # Prior Distributions for unknown model parameters:
            nu_0 = pm.HalfNormal('nu_0', sigma=10)
            sigma_0 = pm.HalfNormal('sigma_0', sigma=10)
            mu_0 = pm.Normal('mu_0', mu=0, sigma=10)

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_0 = pm.StudentT('observed_data_0', nu=nu_0, mu=mu_0, sigma=sigma_0, observed=data)
    
            # draw 5000 posterior samples
            trace_0 = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_0 = pm.sample_posterior_predictive(trace_0, samples=3000)
            print(post_pred_0['observed_data_0'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_0))
            print(pm.stats.summary(data=post_pred_0))
########################################################################################################################
    return trace_0, post_pred_0

def S2_P_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_P:
            # Prior Distributions for unknown model parameters:
            nu_P = from_posterior('nu_0', trace['nu_0'])
            sigma_P = from_posterior('sigma_0', trace['sigma_0'])
            mu_P = from_posterior('mu_0', trace['mu_0'])

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_P = pm.StudentT('observed_data_P', nu=nu_P, mu=mu_P, sigma=sigma_P, observed=data)
    
            # draw 5000 posterior samples
            trace_P = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_P = pm.sample_posterior_predictive(trace_P, samples=3000)
            print(post_pred_P['observed_data_P'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_P))
            print(pm.stats.summary(data=post_pred_P))
########################################################################################################################
    return trace_P, post_pred_P

def S1_C_Bayesian_Interface(nu, mu, sigma, data):
########################################################################################################################
    with pm.Model() as model_C:
            # Prior Distributions for unknown model parameters:
            nu_C = from_posterior('nu_P', trace['nu_P'])
            sigma_C = from_posterior('sigma_P', trace['sigma_P'])
            mu_C = from_posterior('mu_P', trace['mu_P'])

            # Observed data is from a Likelihood distributions (Likelihood (sampling distribution) of observations):
            observed_data_C = pm.StudentT('observed_data_C', nu=nu_C, mu=mu_C, sigma=sigma_C, observed=data)
    
            # draw 5000 posterior samples
            trace_C = pm.sample(draws=5000, tune=3000, chains=2, cores=1)
    
            # Obtaining Posterior Predictive Sampling:
            post_pred_C = pm.sample_posterior_predictive(trace_C, samples=3000)
            print(post_pred_C['observed_data_C'].shape)
            print('\nSummary: ')
            print(pm.stats.summary(data=trace_C))
            print(pm.stats.summary(data=post_pred_C))
########################################################################################################################
    return trace_C, post_pred_C

for m in range(len(S2_data_list)):
    print('\n')
    print('Acessing list number: ', m)
    # print(S2_data_list)
    # print(len(S2_data_list))
    # Initializing the PDF Distribution:
    print('#######################################################################################################')
    print('Designing the Bayesian PDF for initial Sensor 2:')
    trace_S2_0, post_pred_S2_0 = S2_0_Bayesian_Interface(data=S2_data_list[0])
    best_dist = getattr(st, 't')
    param_S2_0 = [np.mean(trace_S2_0['nu_0']), np.mean(trace_S2_0['mu_0']), np.mean(trace_S2_0['sigma_0'])]
    print('Parameters values = ', param_S2_0)
    pdf_S2_0, B_x_axis_pdf_S2_0, y_axis_pdf_S2_0 = make_pdf(data=S2_data_list[0], dist=best_dist, params=param_S2_0, size=len(trace_S2_0))
    print('#######################################################################################################')
    plt.plot(B_x_axis_pdf_S2_0, y_axis_pdf_S2_0, label='%i. step (Initial[S2])' % (m + 1))

    # Predict:
    print('#######################################################

When it gets to the predicting stage, I keep on getting:

Unknown variable nu_0

Also, side question:

How can I speed up the sampling for posterior? Because when I run the code sometimes it takes around 20sec (If I remember it correctly :sweat_smile:), but, now when I run the code it takes a long time to obtain posterior distribution like around 300 seconds.