Multiprocessing (windows 10) BrokenPipeError: [Errno 32] Broken pipe

While running the disaster model (https://github.com/pymc-devs/pymc3/blob/master/pymc3/examples/disaster_model_theano_op.py) with 2 cores on my computer. I get the following error.

ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe

I guess it has to do something with multiprocessing using theano as_op. It works when I set cores to 1. Any ideas, how to make it work for multiprocessing in windows 10?

Looks like issue #3140. @aseyboldt did some digging and saw that it’s a windows related issue. If you can run your code in a batch script, if you enclose the main calculations in an if __name__ == '__main__': statement, the code should run without crashing. Could you try that?

I enclosed all script within an if __name__ == '__main__':

"""
Similar to disaster_model.py, but for arbitrary
deterministics which are not not working with Theano.
Note that gradient based samplers will not work.
"""


import pymc3 as pm
from theano.compile.ops import as_op
import theano.tensor as tt
from numpy import arange, array, empty

if __name__ == '__main__':
    
    __all__ = ['disasters_data', 'switchpoint', 'early_mean', 'late_mean', 'rate',
               'disasters']
    
    # Time series of recorded coal mining disasters in the UK from 1851 to 1962
    disasters_data = array([4, 5, 4, 0, 1, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6,
                            3, 3, 5, 4, 5, 3, 1, 4, 4, 1, 5, 5, 3, 4, 2, 5,
                            2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0, 0,
                            1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1,
                            0, 1, 0, 1, 0, 0, 0, 2, 1, 0, 0, 0, 1, 1, 0, 2,
                            3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 4, 2, 0, 0, 1, 4,
                            0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1])
    years = len(disasters_data)
    
    
    @as_op(itypes=[tt.lscalar, tt.dscalar, tt.dscalar], otypes=[tt.dvector])
    def rate_(switchpoint, early_mean, late_mean):
        out = empty(years)
        out[:switchpoint] = early_mean
        out[switchpoint:] = late_mean
        return out
    
    
    with pm.Model() as model:
    
        # Prior for distribution of switchpoint location
        switchpoint = pm.DiscreteUniform('switchpoint', lower=0, upper=years)
        # Priors for pre- and post-switch mean number of disasters
        early_mean = pm.Exponential('early_mean', lam=1.)
        late_mean = pm.Exponential('late_mean', lam=1.)
    
        # Allocate appropriate Poisson rates to years before and after current
        # switchpoint location
        idx = arange(years)
        rate = rate_(switchpoint, early_mean, late_mean)
    
        # Data likelihood
        disasters = pm.Poisson('disasters', rate, observed=disasters_data)
    
        # Use slice sampler for means
        step1 = pm.Slice([early_mean, late_mean])
        # Use Metropolis for switchpoint, since it accomodates discrete variables
        step2 = pm.Metropolis([switchpoint])
    
        # Initial values for stochastic nodes
        start = {'early_mean': 2., 'late_mean': 3.}
    
        tr = pm.sample(1000, tune=500, start=start, step=[step1, step2], cores=2)
        pm.traceplot(tr)

Still I see the same error

  File "<ipython-input-4-dcd32d519507>", line 61, in <module>
    tr = pm.sample(1000, tune=500, start=start, step=[step1, step2], cores=2)

  File "C:\Users\Mukesh\Anaconda3\lib\site-packages\pymc3\sampling.py", line 449, in sample
    trace = _mp_sample(**sample_args)

  File "C:\Users\Mukesh\Anaconda3\lib\site-packages\pymc3\sampling.py", line 996, in _mp_sample
    chain, progressbar)

  File "C:\Users\Mukesh\Anaconda3\lib\site-packages\pymc3\parallel_sampling.py", line 275, in __init__
    for chain, seed, start in zip(range(chains), seeds, start_points)

  File "C:\Users\Mukesh\Anaconda3\lib\site-packages\pymc3\parallel_sampling.py", line 275, in <listcomp>
    for chain, seed, start in zip(range(chains), seeds, start_points)

  File "C:\Users\Mukesh\Anaconda3\lib\site-packages\pymc3\parallel_sampling.py", line 182, in __init__
    self._process.start()

  File "C:\Users\Mukesh\Anaconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)

  File "C:\Users\Mukesh\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)

  File "C:\Users\Mukesh\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)

  File "C:\Users\Mukesh\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)

  File "C:\Users\Mukesh\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)

BrokenPipeError: [Errno 32] Broken pipe

Could you please tell us the version numbers of Python, pymc3, theano and joblib, and also tell us your particular os? We had trouble to reproduce the error and that makes it really hard to fix.

Python: 3.6.3
PyMC3: 3.5
theano: 1.0.3
joblib: 0.11
OS: Windows 10

image

I’ve managed to reproduce the error but I haven’t been able to determine the cause of the breakage yet. The picker is able to dumps the conflictive process object, so at least that is not the culprit of the pipe breakage. I’ll dig around more tomorrow and get back.
What is certain is that this is exclusively a windows multiprocessing issue.

Any update on the breakage in windows?

Sorry, I got sick and couldn’t dig further. Today I’ll continue

I think that I finally found what was causing the problem!

It was quite hard to debug because both spyder and jupyter’s IPython kernels only raised the BrokenPipeError that was produced by the root process, but the cause was that the child process raised an exception and terminated its execution. When the root process then tried to communicate with the child, it saw that it was dead (such a grim phrase to say out of context…) and then raised the broken pipe that we saw. When I executed the code on a system’s terminal the child’s exception got printed to the terminal’s stderr and I could see why the child was having errors.

The first main problem was that when the child process attempted to start it found no enclosing __name__ == "__main__" statement:

RuntimeError:
    An attempt has been made to start a new process before the current process has finished its bootstrapping phase.
    This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module:
    if __name__ == '__main__':
        freeze_support()
        ...
    The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.

When I simply enclosed all the code in if __name__ == "__main__":, a different error occurred saying that the child process couldn’t unpickle the rate_ function because it was not defined at the root level. The solution was to just move the part of the code that was being enclosed in the if __name__ == "__main__". Please, @mukesh, tell me if you can run the following without seeing broken pipes:

"""
Similar to disaster_model.py, but for arbitrary
deterministics which are not not working with Theano.
Note that gradient based samplers will not work.
"""


import pymc3 as pm
from theano.compile.ops import as_op
import theano.tensor as tt
from numpy import arange, array, empty

__all__ = ['disasters_data', 'switchpoint', 'early_mean', 'late_mean', 'rate',
           'disasters']

# I had to add this when running inside spyder's IPython kernel, but you can
# remove it if you want to run in a system's terminal
__spec__ = ("ModuleSpec(name='builtins', "
            "loader=<class '_frozen_importlib.BuiltinImporter'>)")


# Time series of recorded coal mining disasters in the UK from 1851 to 1962
disasters_data = array([4, 5, 4, 0, 1, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6,
                        3, 3, 5, 4, 5, 3, 1, 4, 4, 1, 5, 5, 3, 4, 2, 5,
                        2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0, 0,
                        1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1,
                        0, 1, 0, 1, 0, 0, 0, 2, 1, 0, 0, 0, 1, 1, 0, 2,
                        3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 4, 2, 0, 0, 1, 4,
                        0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1])
years = len(disasters_data)


# Notice that rate_ is a module level function
@as_op(itypes=[tt.lscalar, tt.dscalar, tt.dscalar], otypes=[tt.dvector])
def rate_(switchpoint, early_mean, late_mean):
    out = empty(years)
    out[:switchpoint] = early_mean
    out[switchpoint:] = late_mean
    return out


if __name__ == "__main__":
    with pm.Model() as model:
    
        # Prior for distribution of switchpoint location
        switchpoint = pm.DiscreteUniform('switchpoint', lower=0, upper=years)
        # Priors for pre- and post-switch mean number of disasters
        early_mean = pm.Exponential('early_mean', lam=1.)
        late_mean = pm.Exponential('late_mean', lam=1.)
    
        # Allocate appropriate Poisson rates to years before and after current
        # switchpoint location
        idx = arange(years)
        rate = rate_(switchpoint, early_mean, late_mean)
    
        # Data likelihood
        disasters = pm.Poisson('disasters', rate, observed=disasters_data)
    
        # Use slice sampler for means
        step1 = pm.Slice([early_mean, late_mean])
        # Use Metropolis for switchpoint, since it accomodates discrete variables
        step2 = pm.Metropolis([switchpoint])
    
        # Initial values for stochastic nodes
        start = {'early_mean': 2., 'late_mean': 3.}
    
        tr = pm.sample(1000, tune=500, start=start, step=[step1, step2], cores=2)
        pm.traceplot(tr)

I honestly don’t know why unix systems can use multiprocessing without the if __name__ == "__main__" statement but windows can’t. Furthermore, if the __name__ == "__main__" statement is absolutely necessary for windows multiprocessing execution, I think that jupyter notebooks on windows would also run into broken pipes if the model contains functions that were defined inside the notebook and not in a separate module.

2 Likes

That’s great news! Thanks @lucianopaz!
The reason that windows needs the if __name__ is that on windows multiprocessing creates a completely new process, that then has to import the modules the code is using. On Unix it just forks the process, so it basically copies the original processes memory space (using copy on write so that it can do it fast).
It should work on jupyter, that does stuff internally to make using multiprocessing possible (but if you have a machine that fails that way handy, it would be great if you could try again just to make sure :slight_smile: )

So I guess this means that the problem is mostly about bad error messages? Somehow we lose the original (meaningful) RuntimeError or theano error and instead print the BrokenPipeError.
I guess when we receive a message from the other end we should catch BrokenPipeError and try to figure out what happend on the other end. We could also try to make the child process a bit more resilient to errors like that, it shouldn’t die because it can’t compute a gradient.

The relevant code should be here:

If I read the second warning here correctly, then I’m a bit confused about why we don’t get the error message on the main thread. If there is a theano problem, it should send that error message to the main thread and then exit. But why does the main thread just see a Broken Pipe, and not the error massage?

2 Likes

Thank you for looking into this. I have some code that keeps failing on me. So you are suggesting to move all function definitions outside the if __name__ == "__main__": ? Do you know what is the necessary parts that should be kept inside the statement?

The key part that should be inside the if __name__ == "__main__": is pm.sample, because the process pool is created there. Try it, and if that works, great, but you could still get a broken pipe exception.

As @aseyboldt said, the main problem seems to be that if for whatever reason a child process fails and raises an exception, the parent process does not seem to know about it. This means that the parent goes on with its tasks, keeps trying to communicate with the child, then finds out that the communication pipe is broken and raises the BrokenPipeError exception, which is not helpful at all to find out what caused the child to break.

What allowed me to see the exception that was raised by the child was to run the script in a system dedicated terminal. The child process was able to write to stderr of this terminal and its exception became visible, enabling me to try to solve the problem.

My recommendation then is to try to reorganize your code to put the pm.sample inside if __name__ == __main__: but to also run your code in a system dedicated terminal. At least that way, if your code keeps failing, you could see the failed child’s traceback.

Thanks for the explanation! I’ll test it in jupyter and let you know if the problem persists there.

I completely agree with you that the problem is about bad error messages. Maybe what is happening is that the child process communicates the exception and exits before the parent processes that response. Then if the parent tries to send anything to the child, it sees the broken pipe. Maybe, after getting an exception, the child should communicate it and wait for an abort message from the parent to be able to exit, instead of communicating, continuing the run, finishing and exiting.

@aseyboldt, I also get the broken pipe on a jupyter notebook. I’ll check if I can force the child to wait to receive the abort message before exiting.

However, I just noticed that the child’s error gets printed to the terminal that is running the notebook!

Traceback (most recent call last):
    File "<string>", line 1, in <module>
    File "C:...\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main                                     exitcode = _main(fd)
    File "C:...\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
        self = reduction.pickle.load(from_parent)
    File "C:...\Anaconda3\lib\site-packages\theano\compile\ops.py", line 517, in load_back
        obj = getattr(module, name)
AttributeError: module '__main__' has no attribute 'rate_'

It is not possible to make the child processes more robust because they never even get constructed. We have to try to capture the messages sent to the stderr from the process manager…

Thank you for getting back. Moving rate_ function out of the if __name__ == '__main__': solved the problem in windows 10. I can run the above code without seeing broken pipes. Also, your idea of running in the system terminal will help me a lot.