Alright, so I think I was able to somehow achieve that via joblib’s initializer. Setting the following function as the initializer, it started running with no errors:
def set_unique_pytensor_compiledir():
unique_dir = Path(
tempfile.gettempdir(),
f"spectranorm_pytensor_{uuid.uuid4().hex}",
)
os.environ["PYTENSOR_FLAGS"] = f"compiledir={unique_dir}"
# ensure directory exists
Path(unique_dir).mkdir(parents=True, exist_ok=True)
However, after running more than 3000 processes (of more than 10K), it stops with the following error:
2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Starting SNM model fitting:
2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Step 1; direct models for each eigenmode (10000 modes)
Fitting direct models: 31%
3090/10000 [1:02:59<6:17:50, 3.28s/tasks]
/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.
warnings.warn(
Exception ignored in atexit callback: <bound method ModuleCache._on_atexit of <pytensor.link.c.cmodule.ModuleCache object at 0x7fd568abb400>>
Traceback (most recent call last):
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1588, in _on_atexit
self.clear_old()
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1387, in clear_old
too_old_to_use = self.refresh(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 824, in refresh
with lock_ctx():
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/contextlib.py", line 135, in __enter__
return next(self.gen)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/compilelock.py", line 78, in lock_ctx
fl.acquire(timeout=timeout)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/filelock/_api.py", line 341, in acquire
raise Timeout(lock_filename) # noqa: TRY301
filelock._error.Timeout: The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
r = call_item()
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
return self.fn(*self.args, **self.kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
return [func(*args, **kwargs) for func, args, kwargs in self.items]
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
return [func(*args, **kwargs) for func, args, kwargs in self.items]
File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3503, in fit_single_direct
direct_model.fit(
File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1840, in fit
self._fit_model_with_advi(progress_bar=progress_bar)
File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1712, in _fit_model_with_advi
self._trace = pm.fit(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 775, in fit
return inference.fit(n, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 158, in fit
step_func = self.objective.step_function(score=score, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 393, in step_function
updates = self.updates(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 268, in updates
self.add_obj_updates(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 313, in add_obj_updates
obj_target = self(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 458, in __call__
a = self.op.apply(self.tf)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/operators.py", line 63, in apply
return -self.datalogp_norm + self.beta * (self.logq_norm - self.varlogp_norm)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 498, in <lambda>
datalogp_norm = property(lambda self: self.approx.datalogp_norm)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
v = method(self, *args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1377, in datalogp_norm
return self.datalogp / self.symbolic_normalizing_constant
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
v = method(self, *args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1341, in datalogp
return self.sized_symbolic_datalogp.mean(0)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
v = method(self, *args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1321, in sized_symbolic_datalogp
return self._sized_symbolic_varlogp_and_datalogp[1] # shape (s,)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
v = method(self, *args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
return f(*args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1309, in _sized_symbolic_varlogp_and_datalogp
[self.model.varlogp, self.model.datalogp]
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 810, in varlogp
return self.logp(vars=self.free_RVs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 696, in logp
rv_logps = transformed_conditional_logp(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 595, in transformed_conditional_logp
temp_logp_terms = conditional_logp(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 479, in conditional_logp
fgraph = construct_ir_fgraph(rv_values, ir_rewriter=ir_rewriter)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/rewriting.py", line 254, in construct_ir_fgraph
ir_rewriter.rewrite(fgraph)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 120, in rewrite
return self.apply(fgraph, *args, **kwargs)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
sub_prof = rewriter.apply(fgraph)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
sub_prof = rewriter.apply(fgraph)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2456, in apply
sub_prof = grewrite.apply(fgraph)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2040, in apply
nb += self.process_node(fgraph, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1925, in process_node
self.failure_callback(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1780, in warn_inplace
return cls.warn(exc, nav, repl_pairs, node_rewriter, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1768, in warn
raise exc
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1922, in process_node
replacements = node_rewriter.transform(fgraph, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
return self.fn(fgraph, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1160, in constant_folding
return unconditional_constant_folding.transform(fgraph, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
return self.fn(fgraph, node)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1109, in unconditional_constant_folding
thunk = node.op.make_thunk(node, storage_map, compute_map, no_recycling=[])
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
outputs = cl.make_thunk(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
thunk, module = self.cthunk_factory(
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
module = cache.module_from_key(key=key, lnk=self)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
module = self._get_from_hash(module_hash, key)
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1122, in _get_from_hash
key_data.add_key(key, save_pkl=bool(key[0]))
File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 550, in add_key
assert key not in self.keys
AssertionError
"""
The above exception was the direct cause of the following exception:
AssertionError Traceback (most recent call last)
File <timed exec>:24
File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4065, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
4062 pretrained_model_params = copy.deepcopy(self.model_params)
4064 # Fit the adapted model
-> 4065 self.fit(
4066 encoded_train_data,
4067 covariates_dataframe,
4068 n_modes=pretrained_model_params["n_modes"],
4069 n_jobs=n_jobs,
4070 save_directory=save_directory,
4071 save_separate=save_separate,
4072 covariance_structure=pretrained_model_params["sparse_covariance_structure"],
4073 adapt={
4074 "covariate_to_adapt": covariate_to_adapt,
4075 "new_category_names": new_category_names,
4076 "pretrained_model_params": pretrained_model_params,
4077 },
4078 )
File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3933, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
3926 utils.general.prepare_save_directory(
3927 save_directory,
3928 "spectral_normative_model",
3929 )
3931 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3933 self.fit_all_direct(
3934 encoded_train_data=encoded_train_data,
3935 covariates_dataframe=covariates_dataframe,
3936 n_modes=n_modes,
3937 n_jobs=n_jobs,
3938 save_directory=save_directory,
3939 save_separate=save_separate,
3940 adapt=adapt,
3941 )
3943 logger.info("Step 2; identify sparse covariance structure")
3945 self.identify_covariance_structure(
3946 encoded_train_data=encoded_train_data,
3947 covariates_dataframe=covariates_dataframe,
(...)
3950 adapt=adapt,
3951 )
File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3688, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
3659 # Fit the base direct model for each eigenmode using parallel processing
3660 tasks = (
3661 joblib.delayed(self.fit_single_direct)(
3662 variable_of_interest=encoded_train_data[:, i],
(...)
3685 for i in range(n_modes)
3686 )
3687 self.direct_model_params = list(
-> 3688 utils.parallel.ParallelTqdm(
3689 n_jobs=n_jobs,
3690 total_tasks=n_modes,
3691 desc="Fitting direct models",
3692 initializer=utils.general.set_unique_pytensor_compiledir,
3693 )(tasks), # pyright: ignore[reportCallIssue]
3694 )
File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
86 self.total_tasks = len(iterable)
87 # call parent function
---> 88 return super().__call__(iterable)
89 finally:
90 # close tqdm progress bar
91 if self.progress_bar is not None:
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
2066 # The first item from the output is blank, but it makes the interpreter
2067 # progress until it enters the Try/Except block of the generator and
2068 # reaches the first `yield` statement. This starts the asynchronous
2069 # dispatch of the tasks to the workers.
2070 next(output)
-> 2072 return output if self.return_generator else list(output)
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
1679 yield
1681 with self._backend.retrieval_context():
-> 1682 yield from self._retrieve()
1684 except GeneratorExit:
1685 # The generator has been garbage collected before being fully
1686 # consumed. This aborts the remaining tasks if possible and warn
1687 # the user if necessary.
1688 self._exception = True
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
1778 while self._wait_retrieval():
1779 # If the callback thread of a worker has signaled that its task
1780 # triggered an exception, or if the retrieval loop has raised an
1781 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1782 # worker traceback.
1783 if self._aborting:
-> 1784 self._raise_error_fast()
1785 break
1787 nb_jobs = len(self._jobs)
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
1855 # If this error job exists, immediately raise the error by
1856 # calling get_result. This job might not exists if abort has been
1857 # called directly or if the generator is gc'ed.
1858 if error_job is not None:
-> 1859 error_job.get_result(self.timeout)
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
752 backend = self.parallel._backend
754 if backend.supports_retrieve_callback:
755 # We assume that the result has already been retrieved by the
756 # callback thread, and is stored internally. It's just waiting to
757 # be returned.
--> 758 return self._return_or_raise()
760 # For other backends, the main thread needs to run the retrieval step.
761 try:
File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
771 try:
772 if self.status == TASK_ERROR:
--> 773 raise self._result
774 return self._result
775 finally:
AssertionError:
Do you have any idea why this might be happening?