How can I deal with a computationally expensive simulator method in Sequential Monte Carlo/Approximate Bayesian Computation?

I want to do Bayesian Inference of parameters of a black-box simulator function. The issue is that the simulator is very slow, taking many seconds or minutes for a single evaluation of the model at some parameters. Limiting to a minimum viable problem situation reduces the time it takes for one evaluation of the simulator to ~milliseconds. However, an advantage is that I do not have a large number of parameters I want to infer (< 10), and not a lot of data points for each call of the simulator (~200).

At the moment I am using Approximate Bayesian Computation with Sequential Monte Carlo in a way that is similar to what is described in this example of the PyMC documentation. For the minimum viable problem, this works very well, I am getting the expected distribution for the posterior with a runtime of roughly 15 minutes on my laptop. Looking at plot_pair shows that there is a non-trivial correlation structure between the parameters:

However trying this same approach for the realistic scenarios takes prohibitively long, because the simulator evaluation is so much more expensive.

I tried to tune the number of draws and epsilon in pymc.Simulator() to reduce the runtime while still giving a meaningful result, which already helped, but not enough. Naturally, I am also working on optimising the simulation function, however it is very likely impossible to reduce the runtime fundamentally.

Doing plain MCMC sampling with Slice doesn’t give the expected results even for the minimal version of the problem. Using it for the realistic scenario takes prohibitively long.

I also tried implementing the gradient of the model via a finite difference approach, and tried the NUTS sampler for MCMC as well as the variational inference methods implemented by PyMC (ADVI, FullRankADVI, SVGD). However all of these do not converge to the expected posterior even for the minimal viable model and are prohibitively expensive for the realistic simulator scenario as well.

I asked a similar question on StackExchange, where it was recommended to use Variational Sequential Monte Carlo, which however is not directly supported by PyMC and I could not get it to work.

What are approaches that I could try to deal with such an expensive simulation function? How could I optimise the number of times this simulation function needs to be called as much as possible? Or is there an entirely different approach, which is better suited to this issue? (Apologies if I am missing something obvious, I am quite new to Bayesian inference.)

Hi @lumax and welcome!

Something that I have done in the past is to create a surrogate (meta-model, reduced order model, etc) of my function and then evaluate on that. I haven’t tried to use pymc’s GP capabilities for this in a while but I have had success using the Surrogate Modelling Toolbox since it can give you gradients on your function which gives you access to the more advanced samplers in pymc. You can also provide many of the surrogates with function gradient information if that is possible.

You will need to wrap the surrogate to use it which can be a bit tricky but from the above I think you might already be doing that already.

For what it’s worth, I’m working on getting an open source release of this idea finished but my company’s process is pretty slow so it’s a ways out.

Edit:

On the point of making a surrogate model, I’ve found it’s really important to map your parameters to a unit hypercube space or place hard bounds on them in with some other method. There be monsters outside of the calibrated space.

1 Like

Hi @NateAM
I am new in PyMC. My FEM is computationally expensive for using in Bayesian model in PyMC. As you said I need to use surrogate model. I know this topic is a bit old but I couldn’t find any refence to use appropriate surrogate models in PyMC. Do you have any idea for me to start with?

I’ve had a bit of time to work through some of this just to flesh out some of the ideas on a simple example. Note: I’m not going to argue that the sampling that is being done is of high quality here, just that we are able to sample from a surrogate. I’ve attached a pdf of a jupyter notebook where I tried out a couple of different options. This could be expanded quite a bit more if it would be helpful.

surrogate_modeling.pdf (1.8 MB)

Many thanks NateAM for sharing the file! Exactly! Trying to be able to sample from surrogate can be first approach. When the surrogate model can predict the actual model well, sampling can be modified. For sure it can help me and I would be happy if I could help to expand it!

Thank you NateAM! This is the first complete example of surrogate modelling I found for smt and PyMC. I am building my own surrogate for a very complex function but I get and undebuggable kernel crash,

Cannot execute code, session has been disposed. Please try restarting the Kernel.

The Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure.

I mostly followed you code but adapted for my scenario (a real function with 3 real, positive definite arguments)

from smt.surrogate_models import KRG
from auxiliar import complexFunction

nparams = 3
unitSamplesTrain = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=1)
unitSamplesTest = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=5)

ndim = 3
ranges = {}

ranges['ep1'] = 4, 40
ranges['ep2'] = 4, 40
ranges['d'] = 0.1, 0.7

parameter_priors = [scipy.stats.uniform(loc=ranges['ep1'][0],scale=ranges['ep1'][1]), 
                    scipy.stats.uniform(loc=ranges['ep2'][0],scale=ranges['ep2'][1]), 
                    scipy.stats.uniform(loc=ranges['d'][0],scale=ranges['d'][1])]

def mapUnitToTrue(unitSamples,distributions):
    return np.vstack([distributions[i].ppf(unitSamples[:,i]) for i in range(unitSamples.shape[1])]).T
                      
def mapTrueToUnit(trueSamples,distributions):
    return np.vstack([distributions[i].cdf(trueSamples[:,i]) for i in range(trueSamples.shape[1])]).T

trueSamplesTrain = mapUnitToTrue(unitSamplesTrain, parameter_priors)
trueSamplesTest = mapUnitToTrue(unitSamplesTest, parameter_priors)
trueUnitParameters = mapTrueToUnit(np.array([eps1_in, eps2_in, d_in]).reshape((1,-1)), parameter_priors)

modelDataTrain = np.vstack([complexFunction(p) for p in trueSamplesTrain])
modelDataTest = np.vstack([complexFunction(p) for p in trueSamplesTest])

t = KRG(theta0=[1e-2]*ndim,print_prediction = False)
t.set_training_values(trueSamplesTrain, modelDataTrain)
t.train()

# # Prediction of the validation points
y = t.predict_values(trueSamplesTest)

class wrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]

    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
        # Initialize the gradient Op
        self.valuegrad = gradWrappedSurrogate(self.surrogate, self.dim)

    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        value = self.surrogate.predict_values(theta.reshape((1,self.dim))).flatten()
        outputs[0][0] = np.array(value)

    def grad(self, inputs, g):
        (theta,) = inputs
        return [g[0] * self.valuegrad(theta)]
    
class gradWrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dvector]

    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate

    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        # outputs[0][0] = np.hstack([self.surrogate.predict_derivatives(theta.reshape((1,self.dim)),i).flatten() for i in range(self.dim)]).flatten()

wrapped_surrogates = wrappedSurrogate(t, 1) # maybe wrappedSurrogate(t, 3)?

#surrogate class unitTest - everything works OK!
inputTest = unitSamplesTrain[0]
wrapped_surrogates.surrogate.predict_values(inputTest.reshape(-1,3)) #(theta.reshape((1,self.dim)))
t.predict_derivatives(unitSamplesTrain[0].reshape((1,-1)),0).flatten()

with pymc.Model() as model:
    # Priors
    unitParametersPrior = pymc.Uniform('unitParametersPrior', lower=0, upper=1, shape=(3,))
    _noise = pymc.HalfNormal('noise',sigma=0.5,shape=(3,))

    # Surrogate responses
    amp1 = wrapped_surrogates(unitParametersPrior)
    # amp2 = wrapped_surrogates[1](_unitParameters)
    # amp3 = wrapped_surrogates[2](_unitParameters)

    # Likelihoods
    _l1 = pymc.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y)
    # _l2 = pymc.Normal('likelihood_2', mu=amp2, sigma=_noise[1], observed=y_noise_components[large_amplitudes[1]])
    # _l3 = pymc.Normal('likelihood_3', mu=amp3, sigma=_noise[2], observed=y_noise_components[large_amplitudes[2]])

with model:
    trace = pymc.sample(3000,tune=1000)

I didn’t modify the class wrapping at all since I don’t understand it well enough. As far as I understand, in your example there were three random variables modeled by three different surrogate models. I am trying to use a single surrogate model which takes as input three variables.

Any feedback is welcomed.

Thank you again!

Hi @verderis, sorry I didn’t respond on the other thread. I meant to and then it fell out of my head.

In my original example the surrogates are scalar valued vector functions so I think iit should work right out of the box for your case.

Do you know where it fails? I think where you are wrapping the surrogate (as you commented out) you should call it as:

wrapped_surrogates = wrappedSurrogate(t, nparams)

where

ndim = nparams = 3

You’ve commented out the definition of the gradient output as well which I imagine is (or will) cause problems.

You also define _noise as a vector when you only need one value. You should change that definition to

_noise = pymc.HalfNormal('noise',sigma=0.5,shape=(3,))

How does it work after making those changes?

Hi @NateAM! You don’t need to apologize, thank you for the feedback. There is not a lot people to ask for advise in such a niche topic, so your advice is really appreciated.

I really don’t know where it fails, since no error is informed and the kernel crashes. I apply both of your corrections:

nparams = 3
ndims = 3
wrapped_surrogates = wrappedSurrogate(t, nparams)
_noise = pymc.HalfNormal('noise',sigma=0.5,shape=(3,))

But the kernel crashes anyway (I also uncommented the gradient to check). I am able both to train the surrogate, and run basic tests on it, like prediction and derivative computation (I don’t know how to test the valuegrad method). Bot in the PyMC loop it crashes.

I was checking your original formulation, and noted that you use as observed = y_noise_components[large_amplitudes[0]], so I modified the code to,

with pymc.Model() as model:
    # Priors
    unitParametersPrior = pymc.Uniform('unitParametersPrior', lower=0, upper=1, shape=(3,))
    _noise = pymc.HalfNormal('noise',sigma=0.5,shape=(3,))

    # Surrogate responses
    amp1 = wrapped_surrogates(unitParametersPrior)

    # Likelihoods
    _l1 = pymc.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y[0])


with model:
    trace = pymc.sample(3000,tune=1000)

So now observed is shape (1,). Now the kernel doesn’t crash but the error is also cryptic (for me),

     34 with model:
---> 35     trace = pymc.sample(3000,tune=1000)

    764 _print_step_hierarchy(step)
    765 try:
--> 766     _mp_sample(**sample_args, **parallel_args)
    767 except pickle.PickleError:
    768     _log.warning("Could not pickle model, sampling singlethreaded.")

File ~/miniconda3/envs/smt/lib/python3.11/site-packages/pymc/sampling/mcmc.py:1155, in _mp_sample(draws, tune, step, chains, cores, random_seed, start, progressbar, traces, model, callback, mp_ctx, **kwargs)
   1153 try:
   1154     with sampler:
-> 1155         for draw in sampler:
   1156             strace = traces[draw.chain]
   1157             strace.record(draw.point, draw.stats)

File ~/miniconda3/envs/smt/lib/python3.11/site-packages/pymc/sampling/parallel.py:448, in ParallelSampler.__iter__(self)
    445     self._progress.update(self._total_draws)
    447 while self._active:
...
--> 378     chunk = read(handle, remaining)
    379     n = len(chunk)
    380     if n == 0:

ConnectionResetError: [Errno 104] Connection reset by peer

The only difference between your formulation and the one I am trying is that you use different surrogate models for different variables and I use just one with three inputs

It looks like I had a typo in my previous response. The noise definition should be a scalar, not a vector of length 3. I don’t think that will fix the problem, but it is something that will need to change.

I don’t know exactly how to pass on the observed value in your case. Do you have one observed value or several? If several, you probably want to pass in your full vector.

You’re running on Linux? Can you run the example I provided with no issues?

Hi @NateAM. Thank you again for the feedback. I am working on Ubuntu 22.04.2 LTS and PyMC 5.5.0.

I corrected the typo but got the same error. Your example worked just fine of of the box.

I wrote everything again with a dummy function and got the same error, so it is not related to the complex function. I copy the code below,

import numpy as np
import scipy.stats
import pymc
import arviz
import pytensor
import pyDOE2
from smt.surrogate_models import KRG #, KPLSK, GEKPLS, MGP, LS, QP, KPLS, 
from smt.utils import compute_rms_error

eps1_in = 10
eps2_in = 20
d_in = 0.2

def prueba1(params):
    '''
    Dummy version
    '''
    eps1, eps2, d = params
    return eps1+eps2+d  

y = prueba1([eps1_in, eps2_in, d_in])

nparams = 3
ndim = 3

unitSamplesTrain = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=1)
unitSamplesTest = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=5)

ranges = {}
ranges['ep1'] = 4, 40
ranges['ep2'] = 4, 40
ranges['d'] = 0.1, 0.7

parameter_priors = [scipy.stats.uniform(loc=ranges['ep1'][0],scale=ranges['ep1'][1]), 
                    scipy.stats.uniform(loc=ranges['ep2'][0],scale=ranges['ep2'][1]), 
                    scipy.stats.uniform(loc=ranges['d'][0],scale=ranges['d'][1])]

def mapUnitToTrue(unitSamples,distributions):
    return np.vstack([distributions[i].ppf(unitSamples[:,i]) for i in range(unitSamples.shape[1])]).T
                      
def mapTrueToUnit(trueSamples,distributions):
    return np.vstack([distributions[i].cdf(trueSamples[:,i]) for i in range(trueSamples.shape[1])]).T

trueSamplesTrain = mapUnitToTrue(unitSamplesTrain, parameter_priors)
trueSamplesTest = mapUnitToTrue(unitSamplesTest, parameter_priors)
trueUnitParameters = mapTrueToUnit(np.array([eps1_in, eps2_in, d_in]).reshape((1,-1)), parameter_priors)

modelDataTrain = np.vstack([prueba1(p) for p in trueSamplesTrain])
modelDataTest = np.vstack([prueba1(p) for p in trueSamplesTest])

t = KRG(theta0=[1e-2]*ndim,print_prediction = False)
t.set_training_values(trueSamplesTrain, modelDataTrain)
t.train()

# # Prediction of the validation points
y = t.predict_values(trueSamplesTest)
print('Kriging, err: '+ str(compute_rms_error(t, trueSamplesTest, modelDataTest)))

class wrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]

    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
        # Initialize the gradient Op
        self.valuegrad = gradWrappedSurrogate(self.surrogate, self.dim)

    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        # print(theta.reshape((-1,self.dim)).shape)
        # value = self.surrogate.predict_values(theta.reshape((1,self.dim))).flatten()
        value = self.surrogate.predict_values(theta.reshape((-1,3))).flatten() #alternative version
        outputs[0][0] = np.array(value)

    def grad(self, inputs, g):
        (theta,) = inputs
        return [g[0] * self.valuegrad(theta)]
    
class gradWrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dvector]

    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate

    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        outputs[0][0] = np.hstack([self.surrogate.predict_derivatives(theta.reshape((1,self.dim)),i).flatten() for i in range(self.dim)]).flatten()

wrapped_surrogates = wrappedSurrogate(t, ndim)

#tests
inputTest = unitSamplesTrain[0]
wrapped_surrogates.surrogate.predict_values(inputTest.reshape(-1,3)) #(theta.reshape((1,self.dim)))
t.predict_derivatives(unitSamplesTrain[0].reshape((1,-1)),0).flatten()

with pymc.Model() as model:
    # Priors
    unitParametersPrior = pymc.Uniform('unitParametersPrior', lower=0, upper=1, shape=(3,))
    _noise = pymc.HalfNormal('noise',sigma=0.5,shape=(1,))

    # Surrogate responses
    amp1 = wrapped_surrogates(unitParametersPrior)

    # Likelihoods
    _l1 = pymc.Normal('likelihood_1', mu=amp1, sigma=_noise, observed=y[0])

with model:
    trace = pymc.sample(3000,tune=1000)

This code produces no kernel crash but the same error as before

Can you try sampling with a single CPU? I’ll see if I can test out your code on a local machine to see if I can ID the problem as well.

Hi @NateAM! Thank you again for answering!

I’ve been trying to further develop a minimalist version of your approach in order to tackle where the problem is. is just a dummy function with one parameter.

from smt.surrogate_models import KRG 
import numpy as np
import pymc as pm

def prueba1(params):
    '''
    Dummy version
    '''
    eps1 = params[0]
    return eps1*2  

# data generation
eps1_in = 0.27
y = prueba1([eps1_in])

nparams = 1
unitSamplesTrain = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=1)
unitSamplesTest = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=5)

ranges = {}
ranges['ep1'] = 0.1, 1.0
parameter_priors = [scipy.stats.uniform(loc=ranges['ep1'][0],scale=ranges['ep1'][1])]

def mapUnitToTrue(unitSamples,distributions):
    return np.vstack([distributions[i].ppf(unitSamples[:,i]) for i in range(unitSamples.shape[1])]).T
                      
def mapTrueToUnit(trueSamples,distributions):
    return np.vstack([distributions[i].cdf(trueSamples[:,i]) for i in range(trueSamples.shape[1])]).T

trueSamplesTrain = mapUnitToTrue(unitSamplesTrain, parameter_priors)
trueSamplesTest = mapUnitToTrue(unitSamplesTest, parameter_priors)
trueUnitParameters = mapTrueToUnit(np.array([eps1_in]).reshape((1,-1)), parameter_priors)

modelDataTrain = np.vstack([prueba1(p) for p in trueSamplesTrain])
modelDataTest = np.vstack([prueba1(p) for p in trueSamplesTest])

ndim = 1
t = KRG(theta0=[1e-2]*ndim,print_prediction = False)
t.set_training_values(trueSamplesTrain, modelDataTrain)
t.train()

#Original definition
class wrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]
    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
        # Initialize the gradient Op
        self.valuegrad = gradWrappedSurrogate(self.surrogate, self.dim)
    
    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        value = self.surrogate.predict_values(theta.reshape((1,self.dim))).flatten()
        
        outputs[0][0] = np.array(value)
    def grad(self, inputs, g):
        (theta,) = inputs
        return [g[0] * self.valuegrad(theta)]

class gradWrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dvector]
    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        outputs[0][0] = np.hstack([self.surrogate.predict_derivatives(theta.reshape((1,self.dim)),i).flatten() for i in range(self.dim)]).flatten()

wrapped_surrogates = wrappedSurrogate(t, 1)

# testing of some class methods
inputTest = unitSamplesTrain[0]
wrapped_surrogates.surrogate.predict_values(inputTest) #(theta.reshape((1,self.dim)))
t.predict_derivatives(unitSamplesTrain[0].reshape((1,-1)),0).flatten()

with pm.Model() as model:
    # Priors
    _unitParameters = pm.Uniform('unitParameters', lower=0, upper=1, shape=(1,))
    _noise = pm.HalfNormal('noise',sigma=0.5,shape=(1,))

    # Surrogate responses
    amp1 = wrapped_surrogates(_unitParameters)
    # amp2 = wrapped_surrogates[1](_unitParameters)
    # amp3 = wrapped_surrogates[2](_unitParameters)

    # Likelihoods
    _l1 = pm.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y[0])

with model:
    trace = pm.sample(cores = 1)

I tested many things. If cores = 1, the kernel crashes. If I run pm.sample(), this error raises,

ConnectionResetError Traceback (most recent call last) Cell In[58], line 14 

_l1 = pm.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y[0]) 13 with model: 

---> 14 trace = pm.sample() in sample(draws, tune, chains, cores, random_seed, progressbar, step, nuts_sampler, initvals, init, jitter_max_retries, n_init, trace, discard_tuned_samples, compute_convergence_checks, keep_warning_stat, return_inferencedata, idata_kwargs, nuts_sampler_kwargs, callback, mp_ctx, model, **kwargs) 

--> 766 _mp_sample(**sample_args, **parallel_args) 
in _mp_sample(draws, tune, step, chains, cores, random_seed, start, progressbar, traces, model, callback, mp_ctx, **kwargs) 1153 try: 1154 with sampler: -> 1155 for draw in sampler: 1156 strace = traces[draw.chain] 1157 strace.record(draw.point, draw.stats) 

 in ParallelSampler.__iter__(self) 445 self._progress.update(self._total_draws) 447 while self._active: --> 448 draw = ProcessAdapter.recv_draw(self._active)
--> 378 chunk = read(handle, remaining) 379 n = len(chunk) 380 if n == 0: 

ConnectionResetError: [Errno 104] Connection reset by peer

This is very confusing. I don’t understand why the 1-demensional version does not work directly, nor the single CPU vs parallel behavior.

Any idea of what to test next? Do you have any suggestion about what to read in order to better understand the interface wrappedSurrogate? I’ve been trying to develop my own version but I don’t know PyMC internals well enough.

Thank you again!

Hi @NateAM! First, thank you again for your previous feedback.
I am still stuck triny to implement your approach to my problem. Speciffically, when used to estimate in a surrogate model of the form y=f(x1, x2, x3) your definition of wrapperSurrogate doesn’t work (the kernel crashes). Since no feedback is produced, it is very difficult to debug.

I tried to rebuild your line of though by following your previous threads (here) for Guassian Process, but I couldn’t adapt it to my problem. I need to better understand which methods are needed to implement in order to PyMC to be able to sample from any surrogate model from any input shape. Is there some documentation I can read? I’ve seen some feedback from @junpenglao at the old thread. any feedback is welcomed.

Hi @verderis ,

I took a look at the code and the solution is to change

# Likelihoods
_l1 = pm.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y[0])

to

# Likelihoods
_l1 = pm.Normal('likelihood_1', mu=amp1, sigma=_noise[0], observed=y)

The problem is that y[0] is a float and the output of amp1 is a vector. The error message is incredibly opaque here.

The error message above sounds like a generic multiprocessing error. Usually setting chains=1 allows you to see the actual error message

Thank you @NateAM ! It worked!
It is indeed an obscure error.

Since my application was for ndim = 3 (y=f(x1,x2,x3)), I leave below the code for that case. I am just struggling to define different priors for different parameters, but I think I will be able to work around it using something like (priors = [_unitParameters1, _unitParameters2, _unitParameters3] and then priors = pt.stack(priors)[:, None]) or use normalized variables. Also, It runs only with the observed y as a float (if a use a vector the kernel crashes). So at this point I cannot run a vector of observations (maybe this is a limitation of the surrogate model itself)

from smt.surrogate_models import KRG
import numpy as np
import pymc as pm
import pyDOE2
import scipy.stats
import pytensor.tensor as pt
from smt.utils import compute_rms_error
from matplotlib import pylab as plt

def test1(params):
    '''
    Dummy version
    '''
    eps1, eps2, d = params
    return eps1+eps2+d  

eps1_in, eps2_in, d_in = 10, 20, 0.5
 
ndim = 3
nparams = 3

unitSamplesTrain = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=1)
unitSamplesTest = pyDOE2.lhs(nparams, samples=10 * nparams, random_state=5)

ranges = {}
ranges['ep1'] = 0, 1
ranges['ep2'] = 0, 1
ranges['d'] = 0, 1

parameter_priors = [scipy.stats.uniform(loc=ranges['ep1'][0],scale=ranges['ep1'][1]), 
                    scipy.stats.uniform(loc=ranges['ep2'][0],scale=ranges['ep2'][1]), 
                    scipy.stats.uniform(loc=ranges['d'][0],scale=ranges['d'][1])]

def mapUnitToTrue(unitSamples,distributions):
    return np.vstack([distributions[i].ppf(unitSamples[:,i]) for i in range(unitSamples.shape[1])]).T
                      
def mapTrueToUnit(trueSamples,distributions):
    return np.vstack([distributions[i].cdf(trueSamples[:,i]) for i in range(trueSamples.shape[1])]).T

trueSamplesTrain = mapUnitToTrue(unitSamplesTrain, parameter_priors)
trueSamplesTest = mapUnitToTrue(unitSamplesTest, parameter_priors)
trueUnitParameters = mapTrueToUnit(np.array([eps1_in, eps2_in, d_in]).reshape((1,-1)), parameter_priors)

modelDataTrain = np.vstack([test1(p) for p in trueSamplesTrain])
modelDataTest = np.vstack([test1(p) for p in trueSamplesTest])

t = KRG(theta0=[1e-2]*ndim,print_prediction = False)
t.set_training_values(trueSamplesTrain, modelDataTrain)
t.train()

#surrogate model visual validation
val_y = t.predict_values(trueSamplesTest)
print('Kriging, err: '+ str(compute_rms_error(t, trueSamplesTest, modelDataTest)))

fig = plt.figure()
plt.plot(modelDataTest, modelDataTest, '-', label='$y_{true}$')
plt.plot(modelDataTest, val_y, 'r.', label='$\hat{y}$')
plt.xlabel('$y_{true}$')
plt.ylabel('$\hat{y}$')
plt.title('Kriging model: validation of the prediction model')

#WS Class definition
class wrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]
    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
        # Initialize the gradient Op
        self.valuegrad = gradWrappedSurrogate(self.surrogate, self.dim)
    
    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        value = self.surrogate.predict_values(theta.reshape((1,self.dim))).flatten()
        
        outputs[0][0] = np.array(value)
    def grad(self, inputs, g):
        (theta,) = inputs
        return [g[0] * self.valuegrad(theta)]

class gradWrappedSurrogate(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dvector]
    def __init__(self, surrogate, dim):
        self.dim = dim
        self.surrogate = surrogate
    def perform(self, node, inputs, outputs):
        (theta,) = inputs
        outputs[0][0] = np.hstack([self.surrogate.predict_derivatives(theta.reshape((1,self.dim)),i).flatten() for i in range(self.dim)]).flatten()

# surrogate definition
wrapped_surrogates = wrappedSurrogate(t, ndim)

# testing of some class methods
inputTest = unitSamplesTrain[0].reshape((1,ndim))
print(wrapped_surrogates.surrogate.predict_values(inputTest)) #(theta.reshape((1,self.dim)))
print(t.predict_derivatives(inputTest,0).flatten())

#observed value
y = 3

with pm.Model() as model:
    # Priors
    _unitParameters = pm.Uniform('unitParameters', lower=0, upper=1, shape=(3,))
    _noise = pm.HalfNormal('noise',sigma=0.5)

    # Surrogate responses
    amp1 = wrapped_surrogates(_unitParameters)

    # Likelihoods
    _l1 = pm.Normal('likelihood_1', mu=amp1, sigma=_noise, observed=y)

with model:
    trace = pm.sample()

Arriving late but it sounds like GitHub - acerbilab/pyvbmc: PyVBMC: Variational Bayesian Monte Carlo algorithm for posterior and model inference in Python may be useful in your case, which is developed specifically for Bayesian inference with an expensive likelihood function.