Pickle error while using lambda function

Dear all,
Can you please suggest me how can I correct in the following code in order to avoid pickle error ?

import pymc3 as pm
import pymc_forward_model as PFM

# create our Op
sigma=1
mu = PFM.MyLikelihoodFunctionGrad(PFM.log_likelihood, data, sigma, XX)
#mu = PFM.MyLikelihoodFunction(PFM.log_likelihood, data, sigma, XX)

My_model = pm.Model()

with My_model:
    
    a=pm.Normal('a', mu=0, sd=500)
    unknowns = tt.as_tensor_variable([a,b,c])
    
    pm.DensityDist('likelihood', lambda v: mu(v), observed={'v': unknowns})
    trace=pm.sample(10000)

Error

Could not pickle model, sampling singlethreaded.

Instead of using a lambda, use

def my_mu(v):
    return mu(v)

as a top level function in your module.

1 Like

hi !
I have the same probleme with the methode: “black box” likelihood function: https://docs.pymc.io/notebooks/blackbox_external_likelihood.html
However, with this modification I have this error: NameError: name ‘v’ is not defined

original : pm.DensityDist(‘likelihood’, lambda v: logl(v), observed={‘v’: theta})
modified : pm.DensityDist(‘likelihood’, my_mu(v), observed={‘v’: theta})

1 Like

It should be

pm.DensityDist(‘likelihood’, my_mu, observed={‘v’: theta})

as you need to pass a function to DensityDist.

2 Likes

yes indeed, Thank you !

But now I got this new error : BrokenPipeError: [Errno 32] Broken pipe
any idea about it ?

1 Like

That’s unfortuantely a WinOS problem with no fix currently. The only workaround is to run sample with cores=1

1 Like

o :(, I tryed to add “if name == “main”:” according to
https://discourse.pymc.io/t/multiprocessing-windows-10-brokenpipeerror-errno-32-broken-pipe/2259/9

But doesn’t seem to be working …

1 Like

But did you manage to get the exception that leads to the broken pipe following those instructions? Are you running a jupyter notebook?

1 Like

Initialy I’m using spyder, but I tryed with jupyter and got the same problem. And I did not really get how to find the exeption. Here the prob :

def my_model(theta,x):
  [...]
    return prediction


def my_loglike(theta,x,data, sigma):

    model = my_model(theta, x)
    
    return multivariate_normal(data, sigma).pdf(model)

class LogLike(tt.Op):


    itypes = [tt.dvector] # expects a vector of parameter values when called
    otypes = [tt.dscalar] # outputs a single scalar value (the log likelihood)

    def __init__(self, loglike, data, x, sigma):

        # add inputs as class attributes
        self.likelihood = loglike
        self.data = data
        self.x = x
        self.sigma = sigma

    def perform(self, node, inputs, outputs):
        # the method that is used when calling the Op
        theta, = inputs  # this will contain my variables

        # call the log-likelihood function
        logl = self.likelihood(theta, self.x, self.data, self.sigma)

        outputs[0][0] = np.array(logl) # output the log-likelihood




ndraws = 500 
nburn = 0   
chains=2
njobs=2

# create our Op
logl = LogLike(my_loglike, data, x, sigma)


def my_mu(v):
    return logl(v)

tim_init=time.process_time()

# use PyMC3 to sampler from log-likelihood
if __name__ == "__main__":
    with pm.Model() as model1:
        var1 = pm.Triangular('var1', lower=0.2, upper=1.8, c=1)
        var2 = pm.Triangular('var2', lower=0.2, upper=1.8, c=1)
        var3 = pm.Triangular('var3', lower=0.2, upper=1.8, c=1)
        var4 = pm.Triangular('var4', lower=0.2, upper=1.8, c=1)
    
    
        # convert m and c to a tensor vector
        theta = tt.as_tensor_variable([var1, var2, var3,var4])#,var4, var5, var6])
    
        # use a DensityDist (use a lamdba function to "call" the Op)
        pm.DensityDist('likelihood',my_mu , observed={'v': theta})# 
        step = pm.Slice()
        trace = pm.sample(ndraws, tune=nburn, discard_tuned_samples=True, chains=chains, step=step)

et

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
<ipython-input-2-48365da83eef> in <module>
    498         pm.DensityDist('likelihood',  my_mu, observed={'v': theta})# lambda v: logl(v)
    499         step = pm.Slice()
--> 500         trace = pm.sample(ndraws, tune=nburn, discard_tuned_samples=True, chains=chains, step=step)
    501 
    502 

~\Anaconda2\envs\pymc\lib\site-packages\pymc3\sampling.py in sample(draws, step, init, n_init, start, trace, chain_idx, chains, cores, tune, nuts_kwargs, step_kwargs, progressbar, model, random_seed, live_plot, discard_tuned_samples, live_plot_kwargs, compute_convergence_checks, use_mmap, **kwargs)
    437             _print_step_hierarchy(step)
    438             try:
--> 439                 trace = _mp_sample(**sample_args)
    440             except pickle.PickleError:
    441                 _log.warning("Could not pickle model, sampling singlethreaded.")

~\Anaconda2\envs\pymc\lib\site-packages\pymc3\sampling.py in _mp_sample(draws, tune, step, chains, cores, chain, random_seed, start, progressbar, trace, model, use_mmap, **kwargs)
    984         sampler = ps.ParallelSampler(
    985             draws, tune, chains, cores, random_seed, start, step,
--> 986             chain, progressbar)
    987         try:
    988             try:

~\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py in __init__(self, draws, tune, chains, cores, seeds, start_points, step_method, start_chain_num, progressbar)
    311                 draws, tune, step_method, chain + start_chain_num, seed, start
    312             )
--> 313             for chain, seed, start in zip(range(chains), seeds, start_points)
    314         ]
    315 

~\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py in <listcomp>(.0)
    311                 draws, tune, step_method, chain + start_chain_num, seed, start
    312             )
--> 313             for chain, seed, start in zip(range(chains), seeds, start_points)
    314         ]
    315 

~\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py in __init__(self, draws, tune, step_method, chain, seed, start)
    202         )
    203         # We fork right away, so that the main process can start tqdm threads
--> 204         self._process.start()
    205 
    206     @property

~\Anaconda2\envs\pymc\lib\multiprocessing\process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         # Avoid a refcycle if the target function holds an indirect

~\Anaconda2\envs\pymc\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

~\Anaconda2\envs\pymc\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

~\Anaconda2\envs\pymc\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     63             try:
     64                 reduction.dump(prep_data, to_child)
---> 65                 reduction.dump(process_obj, to_child)
     66             finally:
     67                 set_spawning_popen(None)

~\Anaconda2\envs\pymc\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

BrokenPipeError: [Errno 32] Broken pipe
1 Like

The first thing you should try is if you get an error if you add cores=1 to the above call to sample. The usual situation we see, is that users can sample without any problems when cores=1.

Now, to see the actual exception that is causing the broken pipe you have to:

  1. Open a command prompt. If you used anaconda to install everything, then open an anaconda prompt.
  2. Use cd to reach the directory where the script that you were editing with Spyder is stored.
  3. Run the script as a batch script. That means to execute Python.exe your_script.py or python your_script.py depending on how you have your command prompt configured.
  4. The traceback will be a bit chaotic. There will be 2 or 3 tracebacks of independent exceptions all printed out at the same time. My suggestion is to just copy the full thing and paste it here.

Why does the traceback look nasty? When sample is called with more than a single core, a pool of parallel worker processes are spawned. These process are independent from the root process in which sample was called. The start out with a setup phase, then receive the payload that they have to work on (sampling their chain) and if everything goes well up until then, they get to work, and communicate their progress to the root process. If an exception happens while they are working on their payload, they communicate the problem to the root process, which in turn raises it to the users attention, and terminates.

However, if an exception happens during the setup phase, thev spawned process is still not aware that it must communicate is exceptions to another running process, so it does what python does by default: it prints the exception to stderr. When you run from the command prompt, stderr gets shown in the command prompt itself. On the Spyder terminal or the jupyter notebook, the default stderr is somewhere else, so the users don’t see the exception that was raised during the spawns setup.

You may wonder, what the broken pipe is then, the root process that tried to create the pool of workers also created a communication channel or pipe, to speak with the spawns. However, as the spawns fail to be initialized, the communication pipe gets broken, so the root process raises the useless broken pipe error. It’s useless because the error that would help you to debug your problem is the one raised by the spawns.

Interresting explainations :),
here the result from anaconda prompt:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\spawn.py", line 114, in _main
    prepare(preparation_data)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
Traceback (most recent call last):
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
  File "BLACKBOXE_withgrad.py", line 506, in <module>
    run_name="__mp_main__")
      File "C:\Users\Username\Anaconda2\envs\pymc\lib\runpy.py", line 263, in run_path
trace = pm.sample(ndraws, tune=nburn, discard_tuned_samples=True, chains=chains, step=step,cores=cores)
pkg_name=pkg_name, script_name=fname)  File "C:\Users\Username\Anaconda2\envs\pymc\lib\site-packages\pymc3\sampling.py", line 439, in sample

      File "C:\Users\Username\Anaconda2\envs\pymc\lib\runpy.py", line 96, in _run_module_code
trace = _mp_sample(**sample_args)
mod_name, mod_spec, pkg_name, script_name)  File "C:\Users\Username\Anaconda2\envs\pymc\lib\site-packages\pymc3\sampling.py", line 986, in _mp_sample

  File "C:\Users\Username\Anaconda2\envs\pymc\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
      File "C:\Users\Username\Desktop\BLACKBOXE_withgrad.py", line 553, in <module>
chain, progressbar)
_ = pm.traceplot(trace)  File "C:\Users\Username\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py", line 313, in __init__

    NameErrorfor chain, seed, start in zip(range(chains), seeds, start_points):
name 'trace' is not defined  File "C:\Users\Username\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py", line 313, in <listcomp>

    for chain, seed, start in zip(range(chains), seeds, start_points)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\site-packages\pymc3\parallel_sampling.py", line 204, in __init__
    self._process.start()
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\Username\Anaconda2\envs\pymc\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
1 Like

Is your underlying exception. It seems that you are running an older version of pymc3. In the current version, that line is lin 361 and not 313. Could you update pymc3 to the version hosted on github and retry?

1 Like

Indeed I updated pymc3 with: pip install git+https://github.com/pymc-devs/pymc3, and I get this:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\spawn.py", line 114, in _main
    prepare(preparation_data)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
    run_name="__mp_main__")
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Users\Username\Desktop\BLACKBOXE_withgrad.py", line 553, in <module>
    _ = pm.traceplot(trace)
NameError: name 'trace' is not defined
Traceback (most recent call last):
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\parallel_sampling.py", line 242, in __init__
    self._process.start()
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "BLACKBOXE_withgrad.py", line 506, in <module>
    trace = pm.sample(ndraws, tune=nburn, discard_tuned_samples=True, chains=chains, step=step,cores=cores)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\sampling.py", line 432, in sample
    trace = _mp_sample(**sample_args)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\sampling.py", line 961, in _mp_sample
    chain, progressbar)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\parallel_sampling.py", line 361, in __init__
    for chain, seed, start in zip(range(chains), seeds, start_points)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\parallel_sampling.py", line 361, in <listcomp>
    for chain, seed, start in zip(range(chains), seeds, start_points)
  File "C:\Users\Username\Anaconda2\envs\pymc2\lib\site-packages\pymc3\parallel_sampling.py", line 251, in __init__
    raise exc
RuntimeError: The communication pipe between the main process and its spawned children is broken.
In Windows OS, this usually means that the child process raised an exception while it was being spawned, before it was setup to communicate to the main process.
The exceptions raised by the child process while spawning cannot be caught or handled from the main process, and when running from an IPython or jupyter notebook interactive kernel, the child's exception and traceback appears to be lost.
A known way to see the child's error, and try to fix or handle it, is to run the problematic code as a batch script from a system's Command Prompt. The child's exception will be printed to the Command Promt's stderr, and it should be visible above this error and traceback.
Note that if running a jupyter notebook that was invoked from a Command Prompt, the child's exception should have been printed to the Command Prompt on which the notebook is running.
2 Likes

Great, now the traceback is much cleaner and points the relevant line that tried to get the trace but failed.

1 Like

maybe the solution is then to install ubuntu ?

Hi!
I am using the same example, but when I try to use HMC which needs to calculate the gradient, raise the UserWarning (from gradient function) that:

UserWarning: Derivative calculation did not converge: setting flat derivative.

Any idea to over come from this warning ?

Thank You!

Try moving the trace plot into the model context used during sampling. Could you also paste the full batch script that you’re using here or upload it to a gist?

The problem is hard to debug on windows but should be fixable

Please see this thread and let me know if this helps

Here the full script, (I just replaced the my_model function with a simple example, because the real one is a large python code). I did not get what you mean by “Try moving the trace plot into the model context used during sampling” ?

import pymc3 as pm
import numpy as np
import theano.tensor as tt


COUNT = 0
def increment():
     global COUNT
     COUNT += 1

def standardize(x):
    return (x - x.mean()) / x.std()

def my_model(theta,x):
    var1,var2= theta #, var2,var3
    prediction=x*var1+var2
    increment()
    return prediction


def my_loglike(theta,x,data, sigma):

    model = standardize(my_model(theta, x))
    return -(0.5/sigma**2)*np.sum((data - model)**2)


class LogLike(tt.Op):


    itypes = [tt.dvector] # expects a vector of parameter values when called
    otypes = [tt.dscalar] # outputs a single scalar value (the log likelihood)

    def __init__(self, loglike, data, x, sigma):

        # add inputs as class attributes
        self.likelihood = loglike
        self.data = data
        self.x = x
        self.sigma = sigma

    def perform(self, node, inputs, outputs):
        # the method that is used when calling the Op
        theta, = inputs  # this will contain my variables

        # call the log-likelihood function
        logl = self.likelihood(theta, self.x, self.data, self.sigma)

        outputs[0][0] = np.array(logl) # output the log-likelihood




ndraws = 5000 
nburn = 0   
chains=4
njobs=2
cores=2
x=np.arange(0,24,1)
data=x*10+2
data=standardize(data)
sigma=1

# create our Op
logl = LogLike(my_loglike, data, x, sigma)


def my_mu(v):
    return logl(v)


# use PyMC3 to sampler from log-likelihood
if __name__ == "__main__":
    with pm.Model() as model1:

        var1 = pm.Normal('var1', mu=8, sd=10)
        var2 = pm.Normal('var2', mu=3, sd=10)

    
        # convert m and c to a tensor vector
        theta = tt.as_tensor_variable([var1, var2])#, var2,var3,var4, var5, var6])
    
        # use a DensityDist (use a lamdba function to "call" the Op)
        pm.DensityDist('likelihood',my_mu , observed={'v': theta})# 
        step = pm.Slice()
        trace = pm.sample(ndraws, tune=nburn, discard_tuned_samples=True, chains=chains, step=step,cores=cores)

_ = pm.traceplot(trace)



print('COUNT : %s'%(COUNT))
print('COUNT/ITER : %s'%(COUNT/(ndraws+nburn)/chains))
print('ndraws : %s'%(ndraws))
print('nburn : %s'%(nburn))
print('chains : %s'%(chains))
print('cores : %s'%(cores))


accept = np.sum(trace['var1'][1:] != trace['var1'][:-1])
print("Acceptance Rate var1: ", accept/trace['var1'].shape[0])
#print("count per tAccepted values: ", (COUNT/(ndraws+nburn)/chains)/(accept/trace['var1'].shape[0]))
print("COUNT per Accepted values var1: ", (COUNT/(accept)))

I think this refers to a different issue which has already been solved by replacing the lambda function by:

     def my_mu(v):
        return mu(v)