Hello there,
I am currently trying to scale out of sample predictions in pyspark. Suppose that:
- I have a pretrained model i’ve trained and stored in a unity catalog somewhere (my idata frame). This model takes a pandas (not pyspark dataframe) dataframe of covariates and outputs interval level predictions (so i have a pd.DataFrame → pd.DataFrame like model)
- I have a wrapper (mlflow.pyfunc) that loads my idata frame and uses the trace to sample the posterior predictive function. storing it as a new idata frame of predictions
- My wrapper function then reshapes, sizes, and calculates corresponding interval predictions from the values in (2)
Now, I have tested that my model and the corresponding pyfunc wrapper work as expected Now I want to:
- Create a very large covariate frame in pyspark
- Parition this over the rows n times
- For each partition, run my predict function
- Write the predictions to a delta table
- End
I have been attempting to make use of mapInPandas(); as my prediction is not scaler in nature, as show below:
test_covariate_frame = #my frame of covariates
def predict_partition(iterator):
model = mlflow.pyfunc.load_model("my hopefuilly cool model")
for pdf in iterator:
preds = model.predict(pdf)
pdf['predictions'] = preds
yield pdf
predictions = (
test_covariate frame,
.repartition(10)
.mapInPandas(
predict_partition,
schema=test_ps.schema.add("predictions", "double")
)
)
display(predictions)
The reason why I am using mapInPandas() here is that, according to the docs- each partition is converted into a pandas dataframe where all intermediate calculations are then run. The docs claim that I can run any python function, as opposed to some of the other map() like pyspark functions that are intended for scaler output.
Sadly, I am being thrown some generic looking pyspark errors, and I’m at my wit’s end as to what could be causing the issue. Some things I have considered:
- multi threading issues on single cores worker arising from pm.sample_posterior_predictive (which i have ruled out)
- bad imports from relying on dbfs as opposed to unity catalog
- Incorrect pyfunc definition (as I can load my model and call my pyfunc wrapper without any issue)
Has anyone attempted to ‘chunkify’ their covariate frames when it comes to out of sample prediction using pyspark? If so, what are your recommendations?
Thanks!