Running models in parallel

I’m trying to speed up computation of a larger model and I’m wondering if I could run these in parallel?
For context, the model itself is a hierarchical model with several levels (Warehouse > Product > SKU) that already used broadcasting (via shape). So, to be able to process this for 52 weeks and 3 planning scenarios, I run this in a larger loop. Instead of a serial loop, are there any concerns running this in a parallel loop?
I’m not super familiar with theano, so I don’t know if this could cause hard-to-detect bugs if theano doesn’t keep the parallel loops neatly separated? I hope this makes sense.

Here’s kinda what I’m thinking:

import multiprocessing
from joblib import Parallel, parallel_backend, delayed

def run_model(data, epoch):

    model = get_model(data)
    trace = process_model(model, epoch)

    return trace

def main_parallel(epochs, n_jobs):

    num_cores = multiprocessing.cpu_count()

    backend = parallel_backend('multiprocessing')

    with backend:
        traces = Parallel(n_jobs=n_jobs, verbose=10)(
            delayed(run_model)(data, epoch)
            for epoch in range(0, epochs, 1)
        )
1 Like

Theano itself is not thread safe, so you shouldn’t use python threads to do the parallelisation. multiprocessing is fine though. I’d switch off the chain parallelisation in pymc (pass njobs=1 to pm.sample), it probably doesn’t make much sense in combination with what you are doing, and using multiprocessing within multiprocessing is I guess asking for trouble.

Theano uses marker files in ~/.theano to prevent multiple theano processes from compiling at the same time, but I don’t think that will hurt your performance much as long as the sampling isn’t just taking a very short time.

1 Like

Great feedback, I’ll give that a shot and report back. Thanks @aseyboldt !

1 Like

@clausherther, did you have success with multiple processes? I’m using ray to run multiple processes in parallel, but sampling is very slow.

using Dask with Kubernetes works just fine for me

Hey @DanWeitzenfeld, are you using ray.remote function or actors to run in parallel? I have an example with both, and with the functions I still get theano compile blocking warning messages but everything seems to run in the end. Thanks.

I am looking for help in training multiple bayesian regression models in parallel. I am in this thread about running parallel using Ray. @youngre Are you able to train multiple Bayesian in parallel. sharing my sample code piece here. Any pointer;s to run the multiple bayeisan models in parallel would be of great help.

I am sharing the sample code -

import concurrent.futures
import time
import numpy as np
import pymc3 as pm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

def generate_data():
N = 1000
noise = np.random.normal(0.0, 0.1, N)
X = np.random.normal(1.0, 0.1, N)
obs_y = (0.65 * X) + 0.5 + noise
return X, obs_y

def def_model(X,obs_y):
with pm.Model() as model:
stdev = pm.HalfNormal(‘stdev’, sd=1.)
intercept = pm.Normal(‘intercept’, mu=0.0, sd=1.)
coeff = pm.Normal(‘beta’, mu=0.5, sd=1.)
expected_value = (X * coeff) + intercept
y = pm.Normal(‘y’, mu=expected_value, sd=stdev, observed=obs_y)
return model

def run_model(X, obs_y,run_type=‘mcmc’):
model = def_model(X, obs_y)
with model:
if run_type == ‘mcmc’:
print(‘running sampling method’)
trace = pm.sample(1000,chains=2,cores=1)
elif run_type == ‘advi’:
print(‘running advi method’)
approx = pm.fit(method=‘advi’)
trace = approx.sample(1000)
return ‘done running combo…’

def main():
temp = []
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(run_model, X, obs_y, ‘advi’) for _ in range(2)]
for f in concurrent.futures.as_completed(results):
print(f.result())
temp.append(f.result())
return temp

if name==‘main’:
X, obs_y = generate_data()
start = time.perf_counter()
temp = main()
print(temp)
end = time.perf_counter()
print(f’Finished in {round(end-start,2)} second(s)’)

Hey,

I ended up using Ray remote functions to parallelize the runs. I never tracked the down the exact cause of the theano compile blocking warning messages. But I did validate the results and it looked good. Hope that helps.

Thank you @youngre. I am able to run using parallel processing in python. how ever, I keep getting these warnings

INFO (theano.gof.compilelock): Waiting for existing lock by process ‘101746’ (I am process ‘101747’)
Waiting for existing lock by process ‘101746’ (I am process ‘101747’)

This happens only with parallel processing. Though there is no impact on the results, Is there a way we can hide these warnings as this is getting printed to my logger file as well.

Regards
Vidhya shankar.

I’m compiling hierarchical models with many variables in parallel using Dask, and I am getting these same messages. For me, there seems to be a pretty significant hit to my performance because I am finding the MAP estimate ~3.5k times in a loop for a training task. Compiling the model is the slowest step of this iterative process, thus the hit to performance. If anybody has insight on how to get around this issue, I would love to hear about it. I have been trying to allow theano to finish compiling a model in a given worker with a single thread by allocating/requiring resources, but to no avail.

If it’s available to you, AWS Batch lets you sidestep the problem by making it easy to fire up multiple instances, have them run the training/fitting task, and then shut them down.

If this is something that has an established answer, could someone point me towards a tutorial? I’m trying to go the dask+kubernetes route to run inference on multiple VI models in parallel, but I would be grateful if anyone can point me towards a guide. If there isn’t one, I’d be happy to put together a tutorial if I can figure it out.