NUTS and parallelized black box likelihood

Hi,

I have a very complicated and computationally expensive log-likelihood function from which I am attempting to generate a posterior using NUTS. To make my log-likelihood (and gradient calculations) computationally feasible, I utilize multiprocessing.pools. I’d like to use NUTS to generate a posterior. I obtain the following error: AssertionError: daemonic processes are not allowed to have children. (I attach the full traceback below.) In my own likelihood function, I’ve implemented https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic in my log-likelihood function. In particular, I use this piece of code for pools in my log-likelihood function

    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

However, this has had no effect on NUTS working with multiple cores. Clearly, I don’t understand the problem. Is it possible to circumvent that error and also use multiple cores for NUTS? Using trace = pm.sample(cores = 1) of course prevents the error. Here is my code.

    itypes = [tt.dvector]
    otypes = [tt.dscalar]
    
    def __init__(self, loglike, times, muts):
        self.likelihood = loglike
        self.times = times
        self.muts = muts
        self.loglike_grad = []
        self.score = 0

    def perform(self, node, inputs, outputs):
        theta, = inputs
        self.score, self.loglike_grad = self.likelihood(self.times,
                                                        self.muts,
                                                        exp(theta[0]),
                                                        exp(theta[1]),
                                                        exp(theta[2]))
        outputs[0][0] = np.array(self.score)
        
    def grad(self, inputs, g):
        return [g[0]*self.loglike_grad]

loglikeobj = LogLike(get_log_likelihood_score_and_grad, times, muts)
ndraws = 100
nburn = 10

max_ln_rate = log(min(times))
with pm.Model():
    #  Use uniform logarithmic priors.
    ln_off = pm.Uniform('h0',lower=-10, upper=max_ln_rate)
    ln_on =pm.Uniform('h1',lower=-10, upper=max_ln_rate)
    ln_mut = pm.Uniform('h2',lower=-10, upper=max_ln_rate)
    theta = tt.as_tensor_variable([ln_off, ln_on, ln_mut])
    
    pm.DensityDist('likelihood',
                   lambda v: loglikeobj(v),
                   observed={'v': theta})
    trace = pm.sample()

And this is the Traceback.

RemoteTraceback: 
"""
Traceback (most recent call last):
  File "//anaconda3/lib/python3.7/site-packages/theano/compile/function_module.py", line 903, in __call__
    self.fn() if output_subset is None else\
  File "//anaconda3/lib/python3.7/site-packages/theano/gof/op.py", line 892, in rval
    r = p(n, [x[0] for x in i], o)
  File "<ipython-input-5-d497c1ff46d3>", line 18, in perform
    exp(theta[2]))
  File "/Users/zach/Documents/BitBucket/phylogenyinference/telegraph_likelihood_no_class.py", line 439, in get_log_likelihood_score_and_grad
    pool = MyPool()
  File "//anaconda3/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "//anaconda3/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "//anaconda3/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "//anaconda3/lib/python3.7/site-packages/pymc3/parallel_sampling.py", line 110, in run
    self._start_loop()
  File "//anaconda3/lib/python3.7/site-packages/pymc3/parallel_sampling.py", line 160, in _start_loop
    point, stats = self._compute_point()
  File "//anaconda3/lib/python3.7/site-packages/pymc3/parallel_sampling.py", line 191, in _compute_point
    point, stats = self._step_method.step(self._point)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/step_methods/arraystep.py", line 247, in step
    apoint, stats = self.astep(array)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/step_methods/hmc/base_hmc.py", line 122, in astep
    start = self.integrator.compute_state(q0, p0)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/step_methods/hmc/integration.py", line 29, in compute_state
    logp, dlogp = self._logp_dlogp_func(q)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/model.py", line 492, in __call__
    logp, dlogp = self._theano_function(array)
  File "//anaconda3/lib/python3.7/site-packages/theano/compile/function_module.py", line 917, in __call__
    storage_map=getattr(self.fn, 'storage_map', None))
  File "//anaconda3/lib/python3.7/site-packages/theano/gof/link.py", line 325, in raise_with_op
    reraise(exc_type, exc_value, exc_trace)
  File "//anaconda3/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "//anaconda3/lib/python3.7/site-packages/theano/compile/function_module.py", line 903, in __call__
    self.fn() if output_subset is None else\
  File "//anaconda3/lib/python3.7/site-packages/theano/gof/op.py", line 892, in rval
    r = p(n, [x[0] for x in i], o)
  File "<ipython-input-5-d497c1ff46d3>", line 18, in perform
    exp(theta[2]))
  File "/Users/zach/Documents/BitBucket/phylogenyinference/telegraph_likelihood_no_class.py", line 439, in get_log_likelihood_score_and_grad
    pool = MyPool()
  File "//anaconda3/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "//anaconda3/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "//anaconda3/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
Apply node that caused the error: <__main__.LogLike object at 0x1c19633240>(MakeVector{dtype='float64'}.0)
Toposort index: 34
Inputs types: [TensorType(float64, vector)]
Inputs shapes: [(3,)]
Inputs strides: [(8,)]
Inputs values: [array([-4.15705917, -2.85895304, -6.63955919])]
Outputs clients: [[MakeVector{dtype='float64'}(__logp_h0_interval__, __logp_h1_interval__, __logp_h2_interval__, __logp_likelihood)]]

Backtrace when the node is created(use Theano flag traceback.limit=N to make it longer):
  File "//anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3248, in run_ast_nodes
    if (await self.run_code(code, result,  async_=asy)):
  File "//anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3325, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-5-d497c1ff46d3>", line 38, in <module>
    observed={'v': theta})
  File "//anaconda3/lib/python3.7/site-packages/pymc3/distributions/distribution.py", line 46, in __new__
    return model.Var(name, dist, data, total_size)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/model.py", line 845, in Var
    total_size=total_size, model=self)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/model.py", line 1447, in __init__
    self.logp_sum_unscaledt = distribution.logp_sum(**self.data)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/distributions/distribution.py", line 123, in logp_sum
    return tt.sum(self.logp(*args, **kwargs))
  File "<ipython-input-5-d497c1ff46d3>", line 37, in <lambda>
    lambda v: loglikeobj(v),

HINT: Use the Theano flag 'exception_verbosity=high' for a debugprint and storage map footprint of this apply node.
"""

The above exception was the direct cause of the following exception:

AssertionError                            Traceback (most recent call last)
AssertionError: daemonic processes are not allowed to have children
Apply node that caused the error: <__main__.LogLike object at 0x1c19633240>(MakeVector{dtype='float64'}.0)
Toposort index: 34
Inputs types: [TensorType(float64, vector)]
Inputs shapes: [(3,)]
Inputs strides: [(8,)]
Inputs values: [array([-4.15705917, -2.85895304, -6.63955919])]
Outputs clients: [[MakeVector{dtype='float64'}(__logp_h0_interval__, __logp_h1_interval__, __logp_h2_interval__, __logp_likelihood)]]

Backtrace when the node is created(use Theano flag traceback.limit=N to make it longer):
  File "//anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3248, in run_ast_nodes
    if (await self.run_code(code, result,  async_=asy)):
  File "//anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3325, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-5-d497c1ff46d3>", line 38, in <module>
    observed={'v': theta})
  File "//anaconda3/lib/python3.7/site-packages/pymc3/distributions/distribution.py", line 46, in __new__
    return model.Var(name, dist, data, total_size)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/model.py", line 845, in Var
    total_size=total_size, model=self)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/model.py", line 1447, in __init__
    self.logp_sum_unscaledt = distribution.logp_sum(**self.data)
  File "//anaconda3/lib/python3.7/site-packages/pymc3/distributions/distribution.py", line 123, in logp_sum
    return tt.sum(self.logp(*args, **kwargs))
  File "<ipython-input-5-d497c1ff46d3>", line 37, in <lambda>
    lambda v: loglikeobj(v),

HINT: Use the Theano flag 'exception_verbosity=high' for a debugprint and storage map footprint of this apply node.

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
<ipython-input-5-d497c1ff46d3> in <module>
     37                    lambda v: loglikeobj(v),
     38                    observed={'v': theta})
---> 39     trace = pm.sample()

//anaconda3/lib/python3.7/site-packages/pymc3/sampling.py in sample(draws, step, init, n_init, start, trace, chain_idx, chains, cores, tune, progressbar, model, random_seed, discard_tuned_samples, compute_convergence_checks, **kwargs)
    435             _print_step_hierarchy(step)
    436             try:
--> 437                 trace = _mp_sample(**sample_args)
    438             except pickle.PickleError:
    439                 _log.warning("Could not pickle model, sampling singlethreaded.")

//anaconda3/lib/python3.7/site-packages/pymc3/sampling.py in _mp_sample(draws, tune, step, chains, cores, chain, random_seed, start, progressbar, trace, model, **kwargs)
    967         try:
    968             with sampler:
--> 969                 for draw in sampler:
    970                     trace = traces[draw.chain - chain]
    971                     if (trace.supports_sampler_stats

//anaconda3/lib/python3.7/site-packages/pymc3/parallel_sampling.py in __iter__(self)
    391 
    392         while self._active:
--> 393             draw = ProcessAdapter.recv_draw(self._active)
    394             proc, is_last, draw, tuning, stats, warns = draw
    395             if self._progress is not None:

//anaconda3/lib/python3.7/site-packages/pymc3/parallel_sampling.py in recv_draw(processes, timeout)
    295             else:
    296                 error = RuntimeError("Chain %s failed." % proc.chain)
--> 297             raise error from old_error
    298         elif msg[0] == "writing_done":
    299             proc._readable = True

RuntimeError: Chain 1 failed.

If I must use cores=1, it’s not the end of the world, too.

Thank you very kindly in advance.

I think that’s the only option, because the way pymc3 works (it ‘pickle’ the sampler and push it to multiprocess)
If you use cores=1, the logp and gradient is still using multiprocess tho.

Thank you for the reply and information!

The problem is that pymc creates deamon process to sample for each chain. Those processes aren’t allowed (a restriction from the python std lib) to create child processes. But if you want to sample several chains in parallel, where each chain uses multiple cores, nobody is stopping you from doing this in several python processes. If you write a script that samples one chain with pm.sample(cores=1), and stores the trace eg in an netcdf file (convert it with arviz) then you can run several instances of that script in parallel and then combine those files with xarray.concat.
Keep in mind that this is probably only a good idea if the parallel code runs for a very short time or if you have a lot of cores.

1 Like