Recommended way to run many PyMC models in parallel (joblib) without triggering compiledir / cache lock errors?

Hi everyone,

I have a project where I need to fit thousands of independent PyMC models, and I’m trying to parallelize at the model level. My setup is:

  • Python 3.10

  • PyMC / PyTensor

  • Parallelism via joblib (loky backend)

  • Each worker subprocess constructs its own pm.Model() and runs inference (via ADVI fit)

This works for some subset of models, but when I scale to many workers I consistently encounter errors like:

filelock._error.Timeout:
   The file lock '/home/ubuntu/.pytensor/compiledir_.../.lock' could not be acquired.

Sometimes joblib reports:

A worker stopped while some jobs were given to the executor...

From reading other issues, it seems that this happens because each worker process may trigger PyTensor compilation at the same time. They then race for the .lock file and one or more workers time out.

My questions

  1. Is there a recommended or supported way to run many PyMC models in parallel without running into compiledir locking?

  2. Should /Could I manually set a per-worker compiledir in each subprocess?

  3. Is there a PyMC config flag I should set at worker initialization?

Also, not sure if this might be helpful, but this is an example traceback of the error:

/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.warnings.warn(
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/vm.py", line 1230, in make_all
    node.op.make_thunk(node, storage_map, compute_map, [], impl=impl)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
    return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
    outputs = cl.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
    cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
    thunk, module = self.cthunk_factory(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
    module = cache.module_from_key(key=key, lnk=self)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
    module = self._get_from_hash(module_hash, key)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1120, in _get_from_hash
    with lock_ctx():
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/compilelock.py", line 78, in lock_ctx
    fl.acquire(timeout=timeout)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/filelock/_api.py", line 341, in acquire
    raise Timeout(lock_filename)  # noqa: TRY301
filelock._error.Timeout: The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3499, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1836, in fit
    self._fit_model_with_advi(progress_bar=progress_bar)
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1708, in _fit_model_with_advi
    self._trace = pm.fit(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 775, in fit
    return inference.fit(n, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 158, in fit
    step_func = self.objective.step_function(score=score, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 406, in step_function
    step_fn = compile([], updates.loss, updates=updates, random_seed=seed, **compile_kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/pytensorf.py", line 947, in compile
    pytensor_function = pytensor.function(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/function/__init__.py", line 332, in function
    fn = pfunc(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/function/pfunc.py", line 466, in pfunc
    return orig_function(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/function/types.py", line 1835, in orig_function
    fn = m.create(defaults)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/function/types.py", line 1719, in create
    _fn, _i, _o = self.linker.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/basic.py", line 245, in make_thunk
    return self.make_all(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/vm.py", line 1239, in make_all
    raise_with_op(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/utils.py", line 526, in raise_with_op
    raise exc_value.with_traceback(exc_trace)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/vm.py", line 1230, in make_all
    node.op.make_thunk(node, storage_map, compute_map, [], impl=impl)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
    return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
    outputs = cl.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
    cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
    thunk, module = self.cthunk_factory(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
    module = cache.module_from_key(key=key, lnk=self)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
    module = self._get_from_hash(module_hash, key)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1120, in _get_from_hash
    with lock_ctx():
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/compilelock.py", line 78, in lock_ctx
    fl.acquire(timeout=timeout)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/filelock/_api.py", line 341, in acquire
    raise Timeout(lock_filename)  # noqa: TRY301
filelock._error.Timeout: The file lock 'The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.
Apply node that caused the error: Join(0, [-2.896989 ... .80814301], intercept_offset_site_adapt_vi_replacement)
Toposort index: 27
Inputs types: [TensorType(int8, shape=()), TensorType(float64, shape=(189,)), TensorType(float64, shape=(1,))]

Backtrace when the node is created (use PyTensor flag traceback__limit=N to make it longer):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3499, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1813, in fit
    mean_effects = self._model_all_mean_effects(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1333, in _model_all_mean_effects
    self._model_categorical_mean_effect(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1235, in _model_categorical_mean_effect
    pt.concatenate(

HINT: Use a linker other than the C linker to print the inputs' shapes and strides.
HINT: Use the PyTensor flag `exception_verbosity=high` for a debug print-out and storage map footprint of this Apply node.' could not be acquired.
"""

The above exception was the direct cause of the following exception:

Timeout                                   Traceback (most recent call last)
File <timed exec>:22

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4060, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
   4057     pretrained_model_params = copy.deepcopy(self.model_params)
   4059 # Fit the adapted model
-> 4060 self.fit(
   4061     encoded_train_data,
   4062     covariates_dataframe,
   4063     n_modes=pretrained_model_params["n_modes"],
   4064     n_jobs=n_jobs,
   4065     save_directory=save_directory,
   4066     save_separate=save_separate,
   4067     covariance_structure=pretrained_model_params["sparse_covariance_structure"],
   4068     adapt={
   4069         "covariate_to_adapt": covariate_to_adapt,
   4070         "new_category_names": new_category_names,
   4071         "pretrained_model_params": pretrained_model_params,
   4072     },
   4073 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3928, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
   3921     utils.general.prepare_save_directory(
   3922         save_directory,
   3923         "spectral_normative_model",
   3924     )
   3926 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3928 self.fit_all_direct(
   3929     encoded_train_data=encoded_train_data,
   3930     covariates_dataframe=covariates_dataframe,
   3931     n_modes=n_modes,
   3932     n_jobs=n_jobs,
   3933     save_directory=save_directory,
   3934     save_separate=save_separate,
   3935     adapt=adapt,
   3936 )
   3938 logger.info("Step 2; identify sparse covariance structure")
   3940 self.identify_covariance_structure(
   3941     encoded_train_data=encoded_train_data,
   3942     covariates_dataframe=covariates_dataframe,
   (...)
   3945     adapt=adapt,
   3946 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3684, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
   3655 # Fit the base direct model for each eigenmode using parallel processing
   3656 tasks = (
   3657     joblib.delayed(self.fit_single_direct)(
   3658         variable_of_interest=encoded_train_data[:, i],
   (...)
   3681     for i in range(n_modes)
   3682 )
   3683 self.direct_model_params = list(
-> 3684     utils.parallel.ParallelTqdm(
   3685         n_jobs=n_jobs,
   3686         total_tasks=n_modes,
   3687         desc="Fitting direct models",
   3688     )(tasks),  # pyright: ignore[reportCallIssue]
   3689 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
     86             self.total_tasks = len(iterable)
     87     # call parent function
---> 88     return super().__call__(iterable)
     89 finally:
     90     # close tqdm progress bar
     91     if self.progress_bar is not None:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
   2066 # The first item from the output is blank, but it makes the interpreter
   2067 # progress until it enters the Try/Except block of the generator and
   2068 # reaches the first `yield` statement. This starts the asynchronous
   2069 # dispatch of the tasks to the workers.
   2070 next(output)
-> 2072 return output if self.return_generator else list(output)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1679     yield
   1681     with self._backend.retrieval_context():
-> 1682         yield from self._retrieve()
   1684 except GeneratorExit:
   1685     # The generator has been garbage collected before being fully
   1686     # consumed. This aborts the remaining tasks if possible and warn
   1687     # the user if necessary.
   1688     self._exception = True

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
   1778 while self._wait_retrieval():
   1779     # If the callback thread of a worker has signaled that its task
   1780     # triggered an exception, or if the retrieval loop has raised an
   1781     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1782     # worker traceback.
   1783     if self._aborting:
-> 1784         self._raise_error_fast()
   1785         break
   1787     nb_jobs = len(self._jobs)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
   1855 # If this error job exists, immediately raise the error by
   1856 # calling get_result. This job might not exists if abort has been
   1857 # called directly or if the generator is gc'ed.
   1858 if error_job is not None:
-> 1859     error_job.get_result(self.timeout)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
    752 backend = self.parallel._backend
    754 if backend.supports_retrieve_callback:
    755     # We assume that the result has already been retrieved by the
    756     # callback thread, and is stored internally. It's just waiting to
    757     # be returned.
--> 758     return self._return_or_raise()
    760 # For other backends, the main thread needs to run the retrieval step.
    761 try:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
    771 try:
    772     if self.status == TASK_ERROR:
--> 773         raise self._result
    774     return self._result
    775 finally:

Timeout: The file lock 'The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.
Apply node that caused the error: Join(0, [-2.896989 ... .80814301], intercept_offset_site_adapt_vi_replacement)
Toposort index: 27
Inputs types: [TensorType(int8, shape=()), TensorType(float64, shape=(189,)), TensorType(float64, shape=(1,))]

Backtrace when the node is created (use PyTensor flag traceback__limit=N to make it longer):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3499, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1813, in fit
    mean_effects = self._model_all_mean_effects(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1333, in _model_all_mean_effects
    self._model_categorical_mean_effect(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1235, in _model_categorical_mean_effect
    pt.concatenate(

HINT: Use a linker other than the C linker to print the inputs' shapes and strides.
HINT: Use the PyTensor flag `exception_verbosity=high` for a debug print-out and storage map footprint of this Apply node.' could not be acquired.

And this is a different error that it sometimes throws:

2025-11-14 21:47:19 : [INFO] - spectranorm.snm - Starting SNM model fitting:
2025-11-14 21:47:19 : [INFO] - spectranorm.snm - Step 1; direct models for each eigenmode (10000 modes)


Fitting direct models:
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3499, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1836, in fit
    self._fit_model_with_advi(progress_bar=progress_bar)
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1708, in _fit_model_with_advi
    self._trace = pm.fit(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 775, in fit
    return inference.fit(n, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 158, in fit
    step_func = self.objective.step_function(score=score, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 393, in step_function
    updates = self.updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 268, in updates
    self.add_obj_updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 313, in add_obj_updates
    obj_target = self(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 458, in __call__
    a = self.op.apply(self.tf)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/operators.py", line 63, in apply
    return -self.datalogp_norm + self.beta * (self.logq_norm - self.varlogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 498, in <lambda>
    datalogp_norm = property(lambda self: self.approx.datalogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1377, in datalogp_norm
    return self.datalogp / self.symbolic_normalizing_constant
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1341, in datalogp
    return self.sized_symbolic_datalogp.mean(0)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1321, in sized_symbolic_datalogp
    return self._sized_symbolic_varlogp_and_datalogp[1]  # shape (s,)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1309, in _sized_symbolic_varlogp_and_datalogp
    [self.model.varlogp, self.model.datalogp]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 810, in varlogp
    return self.logp(vars=self.free_RVs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 696, in logp
    rv_logps = transformed_conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 595, in transformed_conditional_logp
    temp_logp_terms = conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 479, in conditional_logp
    fgraph = construct_ir_fgraph(rv_values, ir_rewriter=ir_rewriter)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/rewriting.py", line 254, in construct_ir_fgraph
    ir_rewriter.rewrite(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 120, in rewrite
    return self.apply(fgraph, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2456, in apply
    sub_prof = grewrite.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2040, in apply
    nb += self.process_node(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1925, in process_node
    self.failure_callback(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1780, in warn_inplace
    return cls.warn(exc, nav, repl_pairs, node_rewriter, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1768, in warn
    raise exc
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1922, in process_node
    replacements = node_rewriter.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1160, in constant_folding
    return unconditional_constant_folding.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1109, in unconditional_constant_folding
    thunk = node.op.make_thunk(node, storage_map, compute_map, no_recycling=[])
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
    return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
    outputs = cl.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
    cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
    thunk, module = self.cthunk_factory(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
    module = cache.module_from_key(key=key, lnk=self)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
    module = self._get_from_hash(module_hash, key)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1122, in _get_from_hash
    key_data.add_key(key, save_pkl=bool(key[0]))
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 550, in add_key
    assert key not in self.keys
AssertionError
"""

The above exception was the direct cause of the following exception:

AssertionError                            Traceback (most recent call last)
File <timed exec>:24

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4060, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
   4057     pretrained_model_params = copy.deepcopy(self.model_params)
   4059 # Fit the adapted model
-> 4060 self.fit(
   4061     encoded_train_data,
   4062     covariates_dataframe,
   4063     n_modes=pretrained_model_params["n_modes"],
   4064     n_jobs=n_jobs,
   4065     save_directory=save_directory,
   4066     save_separate=save_separate,
   4067     covariance_structure=pretrained_model_params["sparse_covariance_structure"],
   4068     adapt={
   4069         "covariate_to_adapt": covariate_to_adapt,
   4070         "new_category_names": new_category_names,
   4071         "pretrained_model_params": pretrained_model_params,
   4072     },
   4073 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3928, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
   3921     utils.general.prepare_save_directory(
   3922         save_directory,
   3923         "spectral_normative_model",
   3924     )
   3926 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3928 self.fit_all_direct(
   3929     encoded_train_data=encoded_train_data,
   3930     covariates_dataframe=covariates_dataframe,
   3931     n_modes=n_modes,
   3932     n_jobs=n_jobs,
   3933     save_directory=save_directory,
   3934     save_separate=save_separate,
   3935     adapt=adapt,
   3936 )
   3938 logger.info("Step 2; identify sparse covariance structure")
   3940 self.identify_covariance_structure(
   3941     encoded_train_data=encoded_train_data,
   3942     covariates_dataframe=covariates_dataframe,
   (...)
   3945     adapt=adapt,
   3946 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3684, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
   3655 # Fit the base direct model for each eigenmode using parallel processing
   3656 tasks = (
   3657     joblib.delayed(self.fit_single_direct)(
   3658         variable_of_interest=encoded_train_data[:, i],
   (...)
   3681     for i in range(n_modes)
   3682 )
   3683 self.direct_model_params = list(
-> 3684     utils.parallel.ParallelTqdm(
   3685         n_jobs=n_jobs,
   3686         total_tasks=n_modes,
   3687         desc="Fitting direct models",
   3688     )(tasks),  # pyright: ignore[reportCallIssue]
   3689 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
     86             self.total_tasks = len(iterable)
     87     # call parent function
---> 88     return super().__call__(iterable)
     89 finally:
     90     # close tqdm progress bar
     91     if self.progress_bar is not None:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
   2066 # The first item from the output is blank, but it makes the interpreter
   2067 # progress until it enters the Try/Except block of the generator and
   2068 # reaches the first `yield` statement. This starts the asynchronous
   2069 # dispatch of the tasks to the workers.
   2070 next(output)
-> 2072 return output if self.return_generator else list(output)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1679     yield
   1681     with self._backend.retrieval_context():
-> 1682         yield from self._retrieve()
   1684 except GeneratorExit:
   1685     # The generator has been garbage collected before being fully
   1686     # consumed. This aborts the remaining tasks if possible and warn
   1687     # the user if necessary.
   1688     self._exception = True

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
   1778 while self._wait_retrieval():
   1779     # If the callback thread of a worker has signaled that its task
   1780     # triggered an exception, or if the retrieval loop has raised an
   1781     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1782     # worker traceback.
   1783     if self._aborting:
-> 1784         self._raise_error_fast()
   1785         break
   1787     nb_jobs = len(self._jobs)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
   1855 # If this error job exists, immediately raise the error by
   1856 # calling get_result. This job might not exists if abort has been
   1857 # called directly or if the generator is gc'ed.
   1858 if error_job is not None:
-> 1859     error_job.get_result(self.timeout)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
    752 backend = self.parallel._backend
    754 if backend.supports_retrieve_callback:
    755     # We assume that the result has already been retrieved by the
    756     # callback thread, and is stored internally. It's just waiting to
    757     # be returned.
--> 758     return self._return_or_raise()
    760 # For other backends, the main thread needs to run the retrieval step.
    761 try:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
    771 try:
    772     if self.status == TASK_ERROR:
--> 773         raise self._result
    774     return self._result
    775 finally:

AssertionError: 

You can set pytensor.config.compiledir to something unique per process so the c cache is isolated. You’ll have a higher disk footprint and no sharing of the cache so things will be slower the first runs

This should be done before everything else runs

I’ve gotten around this by running in the cloud, e.g. Efficient Batch Processing - AWS Batch - AWS. There is the up front time cost of getting set up, but then you can run massively parallel at low cost…

1 Like

So I tried doing that, but I think I ran into an error because I didn’t do it before everthing else, but I’m not sure what I need to change.

Let me give you some more details. This is the error I run into:

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3492, in fit_single_direct
    pytensor.config.compiledir = unique_dir
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 427, in __set__
    raise Exception(
Exception: Can't change the value of compiledir config parameter after initialization!
"""

The above exception was the direct cause of the following exception:

Exception                                 Traceback (most recent call last)
File <timed exec>:24

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4074, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
   4071     pretrained_model_params = copy.deepcopy(self.model_params)
   4073 # Fit the adapted model
-> 4074 self.fit(
   4075     encoded_train_data,
   4076     covariates_dataframe,
   4077     n_modes=pretrained_model_params["n_modes"],
   4078     n_jobs=n_jobs,
   4079     save_directory=save_directory,
   4080     save_separate=save_separate,
   4081     covariance_structure=pretrained_model_params["sparse_covariance_structure"],
   4082     adapt={
   4083         "covariate_to_adapt": covariate_to_adapt,
   4084         "new_category_names": new_category_names,
   4085         "pretrained_model_params": pretrained_model_params,
   4086     },
   4087 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3942, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
   3935     utils.general.prepare_save_directory(
   3936         save_directory,
   3937         "spectral_normative_model",
   3938     )
   3940 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3942 self.fit_all_direct(
   3943     encoded_train_data=encoded_train_data,
   3944     covariates_dataframe=covariates_dataframe,
   3945     n_modes=n_modes,
   3946     n_jobs=n_jobs,
   3947     save_directory=save_directory,
   3948     save_separate=save_separate,
   3949     adapt=adapt,
   3950 )
   3952 logger.info("Step 2; identify sparse covariance structure")
   3954 self.identify_covariance_structure(
   3955     encoded_train_data=encoded_train_data,
   3956     covariates_dataframe=covariates_dataframe,
   (...)
   3959     adapt=adapt,
   3960 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3698, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
   3669 # Fit the base direct model for each eigenmode using parallel processing
   3670 tasks = (
   3671     joblib.delayed(self.fit_single_direct)(
   3672         variable_of_interest=encoded_train_data[:, i],
   (...)
   3695     for i in range(n_modes)
   3696 )
   3697 self.direct_model_params = list(
-> 3698     utils.parallel.ParallelTqdm(
   3699         n_jobs=n_jobs,
   3700         total_tasks=n_modes,
   3701         desc="Fitting direct models",
   3702     )(tasks),  # pyright: ignore[reportCallIssue]
   3703 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
     86             self.total_tasks = len(iterable)
     87     # call parent function
---> 88     return super().__call__(iterable)
     89 finally:
     90     # close tqdm progress bar
     91     if self.progress_bar is not None:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
   2066 # The first item from the output is blank, but it makes the interpreter
   2067 # progress until it enters the Try/Except block of the generator and
   2068 # reaches the first `yield` statement. This starts the asynchronous
   2069 # dispatch of the tasks to the workers.
   2070 next(output)
-> 2072 return output if self.return_generator else list(output)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1679     yield
   1681     with self._backend.retrieval_context():
-> 1682         yield from self._retrieve()
   1684 except GeneratorExit:
   1685     # The generator has been garbage collected before being fully
   1686     # consumed. This aborts the remaining tasks if possible and warn
   1687     # the user if necessary.
   1688     self._exception = True

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
   1778 while self._wait_retrieval():
   1779     # If the callback thread of a worker has signaled that its task
   1780     # triggered an exception, or if the retrieval loop has raised an
   1781     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1782     # worker traceback.
   1783     if self._aborting:
-> 1784         self._raise_error_fast()
   1785         break
   1787     nb_jobs = len(self._jobs)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
   1855 # If this error job exists, immediately raise the error by
   1856 # calling get_result. This job might not exists if abort has been
   1857 # called directly or if the generator is gc'ed.
   1858 if error_job is not None:
-> 1859     error_job.get_result(self.timeout)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
    752 backend = self.parallel._backend
    754 if backend.supports_retrieve_callback:
    755     # We assume that the result has already been retrieved by the
    756     # callback thread, and is stored internally. It's just waiting to
    757     # be returned.
--> 758     return self._return_or_raise()
    760 # For other backends, the main thread needs to run the retrieval step.
    761 try:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
    771 try:
    772     if self.status == TASK_ERROR:
--> 773         raise self._result
    774     return self._result
    775 finally:

Exception: Can't change the value of compiledir config parameter after initialization!

And, basically, I’ve added this to the start of the function I call via joblib.parallel:

        # ----------------------------------------------
        # Give each worker its own PyTensor compiledir
        # ----------------------------------------------
        unique_dir = os.path.join(
            tempfile.gettempdir(),
            "pytensor_cache_" + str(uuid.uuid4())
        )
        pytensor.config.compiledir = unique_dir
        # ----------------------------------------------

How can I ensure the compiledir is defined at the very start (before everything else)? On the other hand, is there a way to overwrite/reset the compiledir?

You can do it like this at the top of your script:

from pytensor import config
config.compiledir = "test"

Importantly it has to be before you explicitly import pymc or pytensor for other purposes. Other libraries / code can be used before just fine.

You can also set it via environment flags if that’s easier, but probably not for this purpose? config – PyTensor Configuration — PyTensor dev documentation

Alright, so I think I was able to somehow achieve that via joblib’s initializer. Setting the following function as the initializer, it started running with no errors:

def set_unique_pytensor_compiledir():
    unique_dir = Path(
        tempfile.gettempdir(),
        f"spectranorm_pytensor_{uuid.uuid4().hex}",
    )
    os.environ["PYTENSOR_FLAGS"] = f"compiledir={unique_dir}"

    # ensure directory exists
    Path(unique_dir).mkdir(parents=True, exist_ok=True)


However, after running more than 3000 processes (of more than 10K), it stops with the following error:

2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Starting SNM model fitting:
2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Step 1; direct models for each eigenmode (10000 modes)

Fitting direct models:  31%
3090/10000 [1:02:59<6:17:50,  3.28s/tasks]

/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.
  warnings.warn(
Exception ignored in atexit callback: <bound method ModuleCache._on_atexit of <pytensor.link.c.cmodule.ModuleCache object at 0x7fd568abb400>>
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1588, in _on_atexit
    self.clear_old()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1387, in clear_old
    too_old_to_use = self.refresh(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 824, in refresh
    with lock_ctx():
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/compilelock.py", line 78, in lock_ctx
    fl.acquire(timeout=timeout)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/filelock/_api.py", line 341, in acquire
    raise Timeout(lock_filename)  # noqa: TRY301
filelock._error.Timeout: The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3503, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1840, in fit
    self._fit_model_with_advi(progress_bar=progress_bar)
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1712, in _fit_model_with_advi
    self._trace = pm.fit(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 775, in fit
    return inference.fit(n, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 158, in fit
    step_func = self.objective.step_function(score=score, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 393, in step_function
    updates = self.updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 268, in updates
    self.add_obj_updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 313, in add_obj_updates
    obj_target = self(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 458, in __call__
    a = self.op.apply(self.tf)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/operators.py", line 63, in apply
    return -self.datalogp_norm + self.beta * (self.logq_norm - self.varlogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 498, in <lambda>
    datalogp_norm = property(lambda self: self.approx.datalogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1377, in datalogp_norm
    return self.datalogp / self.symbolic_normalizing_constant
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1341, in datalogp
    return self.sized_symbolic_datalogp.mean(0)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1321, in sized_symbolic_datalogp
    return self._sized_symbolic_varlogp_and_datalogp[1]  # shape (s,)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1309, in _sized_symbolic_varlogp_and_datalogp
    [self.model.varlogp, self.model.datalogp]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 810, in varlogp
    return self.logp(vars=self.free_RVs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 696, in logp
    rv_logps = transformed_conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 595, in transformed_conditional_logp
    temp_logp_terms = conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 479, in conditional_logp
    fgraph = construct_ir_fgraph(rv_values, ir_rewriter=ir_rewriter)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/rewriting.py", line 254, in construct_ir_fgraph
    ir_rewriter.rewrite(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 120, in rewrite
    return self.apply(fgraph, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2456, in apply
    sub_prof = grewrite.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2040, in apply
    nb += self.process_node(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1925, in process_node
    self.failure_callback(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1780, in warn_inplace
    return cls.warn(exc, nav, repl_pairs, node_rewriter, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1768, in warn
    raise exc
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1922, in process_node
    replacements = node_rewriter.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1160, in constant_folding
    return unconditional_constant_folding.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1109, in unconditional_constant_folding
    thunk = node.op.make_thunk(node, storage_map, compute_map, no_recycling=[])
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
    return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
    outputs = cl.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
    cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
    thunk, module = self.cthunk_factory(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
    module = cache.module_from_key(key=key, lnk=self)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
    module = self._get_from_hash(module_hash, key)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1122, in _get_from_hash
    key_data.add_key(key, save_pkl=bool(key[0]))
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 550, in add_key
    assert key not in self.keys
AssertionError
"""

The above exception was the direct cause of the following exception:

AssertionError                            Traceback (most recent call last)
File <timed exec>:24

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4065, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
   4062     pretrained_model_params = copy.deepcopy(self.model_params)
   4064 # Fit the adapted model
-> 4065 self.fit(
   4066     encoded_train_data,
   4067     covariates_dataframe,
   4068     n_modes=pretrained_model_params["n_modes"],
   4069     n_jobs=n_jobs,
   4070     save_directory=save_directory,
   4071     save_separate=save_separate,
   4072     covariance_structure=pretrained_model_params["sparse_covariance_structure"],
   4073     adapt={
   4074         "covariate_to_adapt": covariate_to_adapt,
   4075         "new_category_names": new_category_names,
   4076         "pretrained_model_params": pretrained_model_params,
   4077     },
   4078 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3933, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
   3926     utils.general.prepare_save_directory(
   3927         save_directory,
   3928         "spectral_normative_model",
   3929     )
   3931 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3933 self.fit_all_direct(
   3934     encoded_train_data=encoded_train_data,
   3935     covariates_dataframe=covariates_dataframe,
   3936     n_modes=n_modes,
   3937     n_jobs=n_jobs,
   3938     save_directory=save_directory,
   3939     save_separate=save_separate,
   3940     adapt=adapt,
   3941 )
   3943 logger.info("Step 2; identify sparse covariance structure")
   3945 self.identify_covariance_structure(
   3946     encoded_train_data=encoded_train_data,
   3947     covariates_dataframe=covariates_dataframe,
   (...)
   3950     adapt=adapt,
   3951 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3688, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
   3659 # Fit the base direct model for each eigenmode using parallel processing
   3660 tasks = (
   3661     joblib.delayed(self.fit_single_direct)(
   3662         variable_of_interest=encoded_train_data[:, i],
   (...)
   3685     for i in range(n_modes)
   3686 )
   3687 self.direct_model_params = list(
-> 3688     utils.parallel.ParallelTqdm(
   3689         n_jobs=n_jobs,
   3690         total_tasks=n_modes,
   3691         desc="Fitting direct models",
   3692         initializer=utils.general.set_unique_pytensor_compiledir,
   3693     )(tasks),  # pyright: ignore[reportCallIssue]
   3694 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
     86             self.total_tasks = len(iterable)
     87     # call parent function
---> 88     return super().__call__(iterable)
     89 finally:
     90     # close tqdm progress bar
     91     if self.progress_bar is not None:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
   2066 # The first item from the output is blank, but it makes the interpreter
   2067 # progress until it enters the Try/Except block of the generator and
   2068 # reaches the first `yield` statement. This starts the asynchronous
   2069 # dispatch of the tasks to the workers.
   2070 next(output)
-> 2072 return output if self.return_generator else list(output)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1679     yield
   1681     with self._backend.retrieval_context():
-> 1682         yield from self._retrieve()
   1684 except GeneratorExit:
   1685     # The generator has been garbage collected before being fully
   1686     # consumed. This aborts the remaining tasks if possible and warn
   1687     # the user if necessary.
   1688     self._exception = True

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
   1778 while self._wait_retrieval():
   1779     # If the callback thread of a worker has signaled that its task
   1780     # triggered an exception, or if the retrieval loop has raised an
   1781     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1782     # worker traceback.
   1783     if self._aborting:
-> 1784         self._raise_error_fast()
   1785         break
   1787     nb_jobs = len(self._jobs)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
   1855 # If this error job exists, immediately raise the error by
   1856 # calling get_result. This job might not exists if abort has been
   1857 # called directly or if the generator is gc'ed.
   1858 if error_job is not None:
-> 1859     error_job.get_result(self.timeout)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
    752 backend = self.parallel._backend
    754 if backend.supports_retrieve_callback:
    755     # We assume that the result has already been retrieved by the
    756     # callback thread, and is stored internally. It's just waiting to
    757     # be returned.
--> 758     return self._return_or_raise()
    760 # For other backends, the main thread needs to run the retrieval step.
    761 try:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
    771 try:
    772     if self.status == TASK_ERROR:
--> 773         raise self._result
    774     return self._result
    775 finally:

AssertionError: 

Do you have any idea why this might be happening?

Can you print pytensor.config.compiledir from the launched jobs to see if it’s set as expected?

Yes, you’re right, after printing I can see that changing the OS environment flag did not actually change the compiledir, moreover, directly setting the compiledir in the initializer still gives this error:

Exception: Can't change the value of compiledir config parameter after initialization!

Which is not expected considering that joblib’s initializer runs at the very start of the subprocess. I guess this is because pytensor is also used in the parent process.

Is there any way to reload pytensor or reset the compiledir?

Maybe @lucianopaz has some pointers

In the meantime, while I wasn’t able to get the C compiler to work with parallel processing over thousands of jobs (running into lock access issues), another solution that ended up working for me was changing to a Python linker by adding the following environment variable before importing pytensor:

os.environ[“PYTENSOR_FLAGS”] = “linker=py”

Although changing to the Python linker does result in slower per-process execution time, being able to run multiple processes in parallel resulted in a faster overall execution (having access to a cloud computer with >50 cpu cores).

Setting mode=NUMBA or mode=JAX also eliminates the need for the C compiler, while offering speed benefits.

We had success at StitchFix in using nutpie compiled models with ray. Note, they have awesome MLOps people that configured everything from the ray server side, but we were routinely fitting hundreds of thousands of models (in fact, the same compiled model!).

Unfortunately I am no longer there so I cannot share the code we used. But I recall it was very easy to link all of this together…

TL;DR use ray if you can.

1 Like