Parallelizing chains with custom likelihood on multiple cores

Hello, I am trying to run many chains in parallel on multiple cores. I am aware of the questions that were already posted, but I could not yet solve this with the feedback that is out there.

I started with a blackbox likelihood model and DEMetropolisZ sampling, and run it sequentially with increasing the number of cores and chains. I am running this on an ubuntu laptop with 16 cores, and with pyMC v5. Here is a minimal example:

import pytensor.tensor as at
import numpy as np
import pymc as pm


# define an aesara Op fopriorr our likelihood function
class LogLike(at.Op):
    """
    Specify what type of object will be passed and returned to the Op when it is
    called. In our case we will be passing it a vector of values (the parameters
    that define our model) and returning a single "scalar" value (the
    log-likelihood)
    """

    itypes = [at.dvector, at.dvector]  # expects a vector of parameter values when called
    otypes = [at.dscalar]  # outputs a single scalar value (the log likelihood)

    def __init__(self, custom_function, observations, sigma):
        """
        Initialise the Op with various things that our log-likelihood function
        requires. Below are the things that are needed in this particular
        example.

        Parameters
        ----------
        custom_function:
            The log-likelihood (or whatever) function we've defined
        observations:
            The "observed" data that our log-likelihood function takes in
        sigma:
            The noise standard deviation that our function requires.
        """

        # add inputs as class attributes
        self.likelihood = custom_function
        self.data = observations
        self.sigma = sigma

    def perform(self, node, inputs, outputs):
        # the method that is used when calling the Op
        (A, B) = inputs  # this will contain my variables
        # call the log-likelihood function
        logl = self.likelihood(A, B, self.data, self.sigma)
        outputs[0][0] = np.array(logl)  # output the log-likelihood

def forward_model(A, B):
    return (A - B) ** 2
def log_likelihood(A, B, data, sigma):
    # call a forward model
    response = forward_model(A, B)
    return np.sum(np.abs(response - data))/sigma


def sample_posterior(variables=20,
                     prior_range: float = 5,
                     cores: int = 48,
                     chains: int = 48,
                     draws: int = 10000,
                     tune: int = 1000):
    dip_names = ['varA_' + str(num) for num in range(variables)]
    azimuth_names = ['varB_' + str(num) for num in range(variables)]
    with pm.Model(coords={"A": dip_names,
                          "B": azimuth_names}) as model:
        A = pm.Uniform("varA", upper=prior_range, lower=-prior_range, dims='A')
        B = pm.Uniform("varB", upper=prior_range, lower=-prior_range, dims='B')
        A = at.as_tensor_variable(A)
        B = at.as_tensor_variable(B)
        data = np.zeros((variables))

        log_l = LogLike(log_likelihood, data, sigma=1)
        pm.Deterministic("likelihood_det", log_l(A, B))
        pm.Potential("likelihood", log_l(A, B))
        idata = pm.sample(draws=draws,
                          step=pm.DEMetropolisZ(),
                          cores=cores,
                          chains=chains,
                          tune=tune,
                          progressbar=False,
                          model=model,
                          random_seed=100722,
                          discard_tuned_samples=False,
                          compute_convergence_checks=False)
        return idata


if __name__ == '__main__':
    import time

    draws = 50000
    for cores in [1, 2, 4, 8]:
        t = time.time()
        sample_posterior(variables=10,
                         prior_range=5,
                         cores=cores,
                         chains=cores,
                         tune=0,
                         draws=draws)
        print(str(cores) + ' core: elapsed time is ' + str(time.time() - t))

The output I get suggests that computation time increases linearly with cores/chains:

Python 3.11.0 | packaged by conda-forge | (main, Oct 25 2022, 06:24:40) [GCC 10.4.0]
Sequential sampling (1 chains in 1 job)
DEMetropolisZ: [varA, varB]
Sampling 1 chain for 0 tune and 50_000 draw iterations (0 + 50_000 draws total) took 5 seconds.
1 core: elapsed time is 6.117705821990967
Multiprocess sampling (2 chains in 2 jobs)
DEMetropolisZ: [varA, varB]
Sampling 2 chains for 0 tune and 50_000 draw iterations (0 + 100_000 draws total) took 9 seconds.
2 core: elapsed time is 10.2704439163208
Multiprocess sampling (4 chains in 4 jobs)
DEMetropolisZ: [varA, varB]
Sampling 4 chains for 0 tune and 50_000 draw iterations (0 + 200_000 draws total) took 18 seconds.
4 core: elapsed time is 19.379039525985718
Multiprocess sampling (8 chains in 8 jobs)
DEMetropolisZ: [varA, varB]
Sampling 8 chains for 0 tune and 50_000 draw iterations (0 + 400_000 draws total) took 38 seconds.
8 core: elapsed time is 39.8494393825531

I also tried this with a pure pyMC model and NUTS sampler (suggested by @cluhmann) but I get the same results. Here’s the code:

import pymc as pm
import time
for cores in [1, 2, 4, 8]:
    draws = 5000
    tune = 100
    with pm.Model() as model:
        t = time.time()
        a = pm.Normal("a")
        b = pm.Normal("b", mu=a, sigma=1, observed=[1, 2, 3, 4])
        idata = pm.sample(draws=draws,
                          cores=cores,
                          chains=cores,
                          tune=tune,
                          progressbar=False)
        print(str(cores) + ' core: elapsed time is ' + str(time.time() - t))

1 core: elapsed time is 2.955948829650879
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [a]
Sampling 2 chains for 100 tune and 5_000 draw iterations (200 + 10_000 draws total) took 2 seconds.
2 core: elapsed time is 2.0471863746643066
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [a]
Sampling 4 chains for 100 tune and 5_000 draw iterations (400 + 20_000 draws total) took 2 seconds.
4 core: elapsed time is 7.911740779876709
Auto-assigning NUTS sampler…
Initializing NUTS using jitter+adapt_diag…
Multiprocess sampling (8 chains in 8 jobs)
NUTS: [a]
Sampling 8 chains for 100 tune and 5_000 draw iterations (800 + 40_000 draws total) took 4 seconds.
8 core: elapsed time is 15.371955394744873

Thanks for the feedback!

2 Likes

Another thing we discussed (I think) was setting compute_convergence_checks to False.

Otherwise, @ricardoV94 do you have any suggestions?

True, we did mention that too. It seems to slow down the sampling considerably in my case, but I suppose this is expected. Still, I do not see how this would affect the parallelization. I tried it with the flag True and False and the output is the same as above.

An update here from my side: I tried the following samplers and noticed that the DMetropolis sampler actually uses all the suggested cores, but not for the entirety of the sampling and not at full capacity. I run the inference on a server using 32 cores and took a snapshot of the CPU usage, which you see below. It seems like the cores are used sporadically and not at full capacity. Is this expected?

I am also wondering how PyMC sampler will assign the sampling work across cores. I am using NUTS by default and observe the similar results as the chart you shared.

I specify the number of cores in the pm.sample( ..., cores = 8) and find CPU utilization is related to the number of chains than the cores if number of cores >= number of chains (from different pm.sample() settings with 2 chains and 4 chains). Sampling work will be assigned to different cores sporadically. Sometimes, one core might be used by 100% while in the next second it might decrease or totally switch to another core. However, the overall CPU utilization percentage will be same as the number of chains. For example, if you have 2 chains, you might see 200% is allocated across 8 cores (if you have an instance with 8 CPUs).

Will the powerful instance type (i.e., instance with more CPUs) help initialize the sampling process? I am not sure about this.

By checking the CPU utilization, I find the initialization of a sampling process might be processed by one core, which can take up to 100% utilization while others remain idle.

Initially I thought that this is the best explanation, but I agree that for the example I posted above this does not work. If it would be properly parallel, the sampling time would not change as cores and chains increase equally.

I am not an expert. I guess pm.Model() might leverage multiple cores if some calculations are based on PyTensor. When doing sampling sequentially, one core processes one chain at a time but the backend calculation might trigger the use of multiple cores.

Agree with you. I also read this post but feel confused for this part. It says “esaily happen”. I am wondering when will be the time that each core will start 8 processes in this example.

So it can easily happen that you start 8 processes with pm.sample(cores=g) , and each of those starts 8 processes using blas. This gives you 64 processes in total, which will really slow things down.

1 Like

Specifying more cores then chains doesn’t matter if that’s the question.

PyMC uses at most one for each chain, it doesn’t try to somehow split the work needed for a single chain across multiple cores (other then what PyTensor automatically does for some computations, but that’s has nothing to do with the cores argument in sample)

About initialization. We compile the needed functions once on the main process before splitting for multi core sampling. You may be noticing that when you say initialization uses cores unevenly?

Btw, on the Pytensor side this could be helpful: Multi cores support in PyTensor — PyTensor dev documentation

About the 8x8 PyTensor cores thing… maybe it does that? You could try to set it to 1, so PyTensor doesn’t use more than it should.

Thanks @ricardoV94 for your response!

It is great to know that

Specifying more cores then (than) chains doesn’t matter if that’s the question.

This confirms my guess.

PyMC uses at most one for each chain, it doesn’t try to somehow split the work needed for a single chain across multiple cores (other then what PyTensor automatically does for some computations, but that’s has nothing to do with the cores argument in sample )

Yes, at the initialization stage, there is one core used by 100% while others remain idle. I notice the uneven utilization of multiple cores when the sampling starts.

About initialization. We compile the needed functions once on the main process before splitting for multi core sampling. You may be noticing that when you say initialization uses cores unevenly?

Will try this… I notice that some pages on PyTensor Doc website are still under construction. Look forward to reading more about the user cases/examples of PyTensor in/with PyMC.

Btw, on the Pytensor side this could be helpful: Multi cores support in PyTensor — PyTensor dev documentation

Thank you @ricardoV94 for responding on this. I still don’t get it though; If pyMC will use one core per chain, then the inference time should not increase if we assign these two as being the same number. So this example below should yield the same computation time (which it doesnt) - except probably the initialization phase, as you mention, but this should only be a minimal increment compared to the inference time.

import time
for cores in [1, 2, 4, 8]:
    draws = 5000
    tune = 100
    with pm.Model() as model:
        t = time.time()
        a = pm.Normal("a")
        b = pm.Normal("b", mu=a, sigma=1, observed=[1, 2, 3, 4])
        idata = pm.sample(draws=draws,
                          step=pm.DEMetropolisZ(), # also with NUTS the results are the same
                          cores=cores,
                          chains=cores,
                          tune=tune,
                          progressbar=False)
        print(str(cores) + ' core: elapsed time is ' + str(time.time() - t))
1 Like

@shakasaki I didn’t have a chance to test your case. I’m currently on a laptop with only 2 cores :confused:

Did you try tweaking the pytensor flags to prevent use of multi-cores there?

~~https://pytensor.readthedocs.io/en/latest/tutorial/multi_cores.html~~

Edit: Nevermind your last model is trivial, and shouldn’t involve any of these.

Hmm, your model is very simple @shakasaki. I wonder if the bottleneck is PyMC trying to store the draws / sampling stats in a common memory (I don’t remember exactly how that’s being done right now), and the sampling itself is trivially fast.

Would need to dig up a bit of that code.

The model above is just to point out the time-lag I observe when invoking my custom likelihood model, which is more complicated and takes about a second to compute, so inference times go up to a few weeks for me. There I observe the same thing though, more chains and cores increases linearly the inference time. Thanks for looking into it!

@shakasaki Yeah I meant the reason for the non-linear time could be completely different between this simple model (maybe writing IO) and your complex model (maybe PyTensor multi-core Ops)

I was also thinking that this could be the reason, so I tried just “stalling” the operation on the forward model by adding a pause. I modified the blackbox likelihood function from my first post to the following:

def forward_model(A, B):
    time.sleep(0.1)
    return (A - B) ** 2

Not sure if this is a valid test, but it certainly slows down the inference to hours. I run this with 6, 12 and 18 cores on a server and still the inference time increases linearly with core number. The forward model now is “slow” so the ratio between calculation time and IO time should be high, and it seems that parallelization does not really help here. If you think of a better test I can run it on the server with up to 48 cores, to test whether parallel chains are actually running.

That’s a good test!

CC @aseyboldt any ideas?

1 Like

I think the original model is so small, that the parallelization doesn’t do anything useful.
If we try something a bit more costly it does:

with pm.Model() as model:
    x = pm.Normal("x", shape=(50, 50))
    y = pt.linalg.solve(x + x.T, np.ones(50)).sum()
    pm.Normal("y", mu=y, observed=100.)

times = []
for cores in [1, 4, 8]:
    start = time.time()
    with model:
        pm.sample(cores=cores, chains=cores, tune=100, draws=100, compute_convergence_checks=False)
    end = time.time()
    times.append(end - start)

This gives me [68.90292978286743, 92.38360643386841, 97.97894310951233], so sampling just one chain is a bit faster than the others, but not much. (Probably because a single core doesn’t produce as much heat, so it can run faster, and because the later times have to wait for the slowest chain to finish?)

Predicting speedups from parallelization is always a bit tricky. Maybe what helps is a bit of background about what’s going one behind the scenes:

  1. When we call pm.sample we first compile the pytensor functions needed for the sampler. This can easily take a couple of seconds, but there is some caching going on, so usually it will be relatively fast.
  2. We start one new process per chain (not core!), serialize the sampler functions, send them to the new processes and each of those will unserialize them then. The processes then just wait to be told to do something.
  3. The main process tells at most cores many of the process to generate a new draw, and waits for one of the worker processes to get back to it. The started worker processes then all run at the same time, and call the logp_grad function a very variable number of times (something between 2 and 1000 times) to generate a draw, and send that draw to the main process. Then they wait again. When the main process gets one of the draws it will first tell the worker process to start computing the next one, and then compute the deterministics, and store the draw in the trace object. It repeats that until all chains are finished (when one of the chains finishes, it will replace it by one of the waiting chains - cores processes that are still waiting).
  4. We then compute convergence checks and return the trace object in the main thread.

If we want to find out how long the whole thing takes it makes sense to think about where the bottleneck will be. It could actually be the first, second or last step, in which case all the parallelization won’t help that much. If it is the 3rd step, it could be for two reasons: If n_logp_calcs_per_draw * time(logp_calc) < n_chains * (time(calc_deterministics) + time(store_in_trace)), then the bottleneck will be the main thread, that can’t keep up with all the new draws from the worker processes. The worker processes will then just wait most of the time, and the parallelization won’t help much. Otherwise, the main thread will often wait for the draws, and the worker processes will actually run in parallel most of the time, and it will help.

I think that pretty much explains what we are seeing?

2 Likes

Thank you @aseyboldt for the excellent anwser. Your explains what we see in the example you have posted. Still, when I run this with a custom likelihood function (which was also the original post) I am running into the same issue. Here is a minimal example where I added a few more linear algebra because on my laptop it was running fast. Probably i’m mistaken again, and there is something else into play, but it would be great to clear this one up too so that the original post is anwsered precisely. Thank you for your time!

Here is the custom likelihood example:

import numpy as np
import pymc as pm
import pytensor.tensor as pt

class LogLike(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]

    def __init__(self, custom_function):
        self.likelihood = custom_function

    def perform(self, node, inputs, outputs):
        logl = self.likelihood(inputs[0])
        outputs[0][0] = np.array(logl)  # output the log-likelihood

def log_likelihood(A):
    x = np.random.randn(A.shape[0], A.shape[0])
    b = np.linalg.solve(x + x.T, A)
    c = np.linalg.svd(x**2)
    return -1.0


import time
draws = 100
variables = 50
times = []
for cores in [3, 6, 9, 12]:
    start = time.time()
    var_names = ['var_' + str(num) for num in range(variables)]
    with pm.Model(coords={"A": var_names}):
        A = pm.Uniform("varA", upper=1, lower=-1, dims='A')
        A = pt.as_tensor_variable(A)
        log_l = LogLike(log_likelihood)
        pm.Potential("likelihood", log_l(A))
        pm.sample(draws=draws,
                  step=pm.DEMetropolis(),
                  cores=cores,
                  chains=cores,
                  tune=draws,
                  progressbar=False,
                  discard_tuned_samples=False,
                  compute_convergence_checks=False)
    end = time.time()
    times.append(end - start)
print(times)

And from this I get:

[51.48392391204834, 96.83360600471497, 142.40000009536743, 203.06998491287231]

@shakasaki you can’t use PyTensor operations on the perform method of an Op. Pytensor does not evaluate an expression when you call something like

c = pt.linalg.svd(x)

It simply builds an object that represents that operation, which is trivial fast to do.

You should only use numpy/python operations on your custom Op perform method.