Hello, I am trying to run many chains in parallel on multiple cores. I am aware of the questions that were already posted, but I could not yet solve this with the feedback that is out there.
I started with a blackbox likelihood model and DEMetropolisZ sampling, and run it sequentially with increasing the number of cores and chains. I am running this on an ubuntu laptop with 16 cores, and with pyMC v5. Here is a minimal example:
import pytensor.tensor as at
import numpy as np
import pymc as pm
# define an aesara Op fopriorr our likelihood function
class LogLike(at.Op):
"""
Specify what type of object will be passed and returned to the Op when it is
called. In our case we will be passing it a vector of values (the parameters
that define our model) and returning a single "scalar" value (the
log-likelihood)
"""
itypes = [at.dvector, at.dvector] # expects a vector of parameter values when called
otypes = [at.dscalar] # outputs a single scalar value (the log likelihood)
def __init__(self, custom_function, observations, sigma):
"""
Initialise the Op with various things that our log-likelihood function
requires. Below are the things that are needed in this particular
example.
Parameters
----------
custom_function:
The log-likelihood (or whatever) function we've defined
observations:
The "observed" data that our log-likelihood function takes in
sigma:
The noise standard deviation that our function requires.
"""
# add inputs as class attributes
self.likelihood = custom_function
self.data = observations
self.sigma = sigma
def perform(self, node, inputs, outputs):
# the method that is used when calling the Op
(A, B) = inputs # this will contain my variables
# call the log-likelihood function
logl = self.likelihood(A, B, self.data, self.sigma)
outputs[0][0] = np.array(logl) # output the log-likelihood
def forward_model(A, B):
return (A - B) ** 2
def log_likelihood(A, B, data, sigma):
# call a forward model
response = forward_model(A, B)
return np.sum(np.abs(response - data))/sigma
def sample_posterior(variables=20,
prior_range: float = 5,
cores: int = 48,
chains: int = 48,
draws: int = 10000,
tune: int = 1000):
dip_names = ['varA_' + str(num) for num in range(variables)]
azimuth_names = ['varB_' + str(num) for num in range(variables)]
with pm.Model(coords={"A": dip_names,
"B": azimuth_names}) as model:
A = pm.Uniform("varA", upper=prior_range, lower=-prior_range, dims='A')
B = pm.Uniform("varB", upper=prior_range, lower=-prior_range, dims='B')
A = at.as_tensor_variable(A)
B = at.as_tensor_variable(B)
data = np.zeros((variables))
log_l = LogLike(log_likelihood, data, sigma=1)
pm.Deterministic("likelihood_det", log_l(A, B))
pm.Potential("likelihood", log_l(A, B))
idata = pm.sample(draws=draws,
step=pm.DEMetropolisZ(),
cores=cores,
chains=chains,
tune=tune,
progressbar=False,
model=model,
random_seed=100722,
discard_tuned_samples=False,
compute_convergence_checks=False)
return idata
if __name__ == '__main__':
import time
draws = 50000
for cores in [1, 2, 4, 8]:
t = time.time()
sample_posterior(variables=10,
prior_range=5,
cores=cores,
chains=cores,
tune=0,
draws=draws)
print(str(cores) + ' core: elapsed time is ' + str(time.time() - t))
The output I get suggests that computation time increases linearly with cores/chains:
Python 3.11.0 | packaged by conda-forge | (main, Oct 25 2022, 06:24:40) [GCC 10.4.0]
Sequential sampling (1 chains in 1 job)
DEMetropolisZ: [varA, varB]
Sampling 1 chain for 0 tune and 50_000 draw iterations (0 + 50_000 draws total) took 5 seconds.
1 core: elapsed time is 6.117705821990967
Multiprocess sampling (2 chains in 2 jobs)
DEMetropolisZ: [varA, varB]
Sampling 2 chains for 0 tune and 50_000 draw iterations (0 + 100_000 draws total) took 9 seconds.
2 core: elapsed time is 10.2704439163208
Multiprocess sampling (4 chains in 4 jobs)
DEMetropolisZ: [varA, varB]
Sampling 4 chains for 0 tune and 50_000 draw iterations (0 + 200_000 draws total) took 18 seconds.
4 core: elapsed time is 19.379039525985718
Multiprocess sampling (8 chains in 8 jobs)
DEMetropolisZ: [varA, varB]
Sampling 8 chains for 0 tune and 50_000 draw iterations (0 + 400_000 draws total) took 38 seconds.
8 core: elapsed time is 39.8494393825531
I also tried this with a pure pyMC model and NUTS sampler (suggested by @cluhmann) but I get the same results. Here’s the code:
import pymc as pm
import time
for cores in [1, 2, 4, 8]:
draws = 5000
tune = 100
with pm.Model() as model:
t = time.time()
a = pm.Normal("a")
b = pm.Normal("b", mu=a, sigma=1, observed=[1, 2, 3, 4])
idata = pm.sample(draws=draws,
cores=cores,
chains=cores,
tune=tune,
progressbar=False)
print(str(cores) + ' core: elapsed time is ' + str(time.time() - t))
1 core: elapsed time is 2.955948829650879
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [a]
Sampling 2 chains for 100 tune and 5_000 draw iterations (200 + 10_000 draws total) took 2 seconds.
2 core: elapsed time is 2.0471863746643066
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [a]
Sampling 4 chains for 100 tune and 5_000 draw iterations (400 + 20_000 draws total) took 2 seconds.
4 core: elapsed time is 7.911740779876709
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (8 chains in 8 jobs)
NUTS: [a]
Sampling 8 chains for 100 tune and 5_000 draw iterations (800 + 40_000 draws total) took 4 seconds.
8 core: elapsed time is 15.371955394744873
Thanks for the feedback!