USING DATAFUSION'S UDAFS TO DO ML TRAINING

By Dan Lovell, Hussain Sultan | February 09, 2024

← ALL POSTS

Welcome back to LETSQL’s exploration series! Today, we’re tackling a common yet complex challenge in data science workflows: enabling efficient groupby-apply operations, akin to pandas, within DataFusion. This capability is a key enabler for applying machine learning models, directly within our data processing pipelines leveraging database compute machinery. In this second blog post, we’ll teach DataFusion to do pandas style groupby-apply aggregation rather than accumulation. To demonstrate the capability, we use a simple XGBoost model that gets trained on each group and outputs top features for the trained model. Then we’ll demonstrate how we’re able to use the same underlying python aggregation function in both pandas and DataFusion, towards the goal of a multi-engine DataFrame API.

You can find the complete code on GitHub.

Introduction

split-apply-combine User-Defined Aggregate Functions (udaf) are emerging as essential tools for sophisticated data analytics on a subset of data. In this post, we use XGboost model training as an example of a complex operation that is well-suited for the UDAFs computation. However, there are two challenges:

  1. Creating a UDAF often involves hardcoding the aggregation computation process.
  2. UDAF’s operations are generally not fully transparent to the query processor, limiting the system’s ability to optimize queries effectively. These challenges underscore the need for a more flexible and efficient approach to aggregating data, which we aim to address through our work with DataFusion.

Many systems provide udaf functionality via accumulators which receive incremental subsets of the data rather than all of it once. This can be more efficient if your operation is expressible as an accumulation (think of something like a sufficient statistic). However, for many modeling use cases, we need simultaneous access to all of the data to be processed. Our use case is gradient boosting via XGBoost, which does indeed need all the data at once.

Problem Statement

We have some loan data and want to find how the most salient features for predicting a target vary over time: training one XGBoost model per year to get the n “best features” per year.

We’d like the user experience to look as close as possible to pandas. Below is our pseudo-code target for how we might do it in pure pandas, how it changes when we do it in pandas via Ibis, and how we’d like to be able to do it in DataFusion via Ibis. We’ll use this as a guide to build our library.

And here’s code for a toy model that fleshes out the pure pandas pseudo code1

Our goal is to teach DataFusion how to use the same curried_calc_best_features function for DataFusion.

Interlude

ibis-datafusion We’ll use Ibis to target both pandas and DataFusion. Ibis aims to be the portable Python dataframe library, allowing you to specify your computation once and target multiple engines, including query engines like DataFusion, and pandas. It is designed to be a high-level API for data manipulation and analysis, and can be used to build complex data processing pipelines.

In our case, we are using Ibis to register UDAFs with DataFusion and pandas, generate a deferred expression in a similar API to pandas and execute them on multiple backends. This requires us to register a udaf with Ibis for each backend.

For the pandas backend, we can automate udaf registration for a function with a known input table like this

We’d like to be able to create the udaf and invoke Ibis with the DataFusion backend in a similar way.

Implementation

We know what we want the user experience to feel like, but what is the DataFusion baseline?

Per the DataFusion example of a udaf, first, you define a class that inherits from Accumulator

You then register this class with DataFusion, specifying additional information (some types and volatility)

We can’t / don’t want to update and merge models, so what’s to be done? We could try to accumulate the rows into a ListArray<Struct>, but this would be computationally costly (think of it like doing a transpose, only worse). Instead, we’ll accumulate the data as serialized RecordBatchs. This is surprisingly cheap because of the underlying Arrow representation.

A first pass at our Accumulator might look something like this

What’s happening here?

  • _states is a list of serialized StructArray
  • state simply returns the accumulator’s list of serialized StructArray, _states, cast into its pyarrow type
  • update combines the columns’ RecordBatchs into a single StructArray, serializes it with python’s pickle and extends _states
  • merge simply extends _states with other Accumulators’ _states
  • pystate converts all the data in _states into a single pandas DataFrame
  • pyevaluate is where the computation actually happens: invoke our pure pandas style function on the DataFrame returned by pystate
  • evaluate invokes pyevaluate and casts it’s return value to return_type

Stay DRY, my friends

Note that almost everything here is boiler-plate with exception of

  • Our aggregation function: curried_calc_best_features
  • The pyarrow type used to cast in evaluate
  • The variable column_names used by update

Note also that we have to coordinate names and types between the Ibis table, udaf definition, and udaf invocation.

Let’s combine all of this into a single function that dynamically generates a class and registers it with both DataFusion and Ibis.

The Datafusion related section of our final library code will look like this

Now, via make_datafusion_udaf, we only need to provide

  • the Ibis table the udaf will be invoked on (used to extract names and types)
  • the function to invoke on each group’s pandas DataFrame
  • the return type of the function

We can optionally provide a name for the udaf that DataFusion uses internally and Ibis will use for the SQL string it generates for the DataFusion backend.

But, does it work?

Ok, let’s invoke this on some real data! We’ll use the LendingClub data from Kaggle and try to determine which features are predictive of a loan not being “Fully Paid” by the end of its standard term (which we’ve already massaged into the data our target variable called event_occurred as well as some other derived variables like dti and cr_age_days).

New Possibilities

Now that we have the infrastructure to create scalar values that are in fact multiple rows, we could conceivably create rows that represent all the data from a group, allowing us to run analytic window functions over time periods: a rolling regression that outputs one model per month with the trailing 12 months as training data. Moreover, now that the UDAF is part of the DataFusion’s planning layer, it opens the possibilities for optimization by rewriting the plan with a specialized operator. But we’ll leave this digression for another time.

Conclusion

We’ve taught DataFusion how to do pandas style aggregation2, we’ve demonstrated the ability to use the same function for both a pandas groupby-apply and datafusion-python’s groupby-apply (by way of Ibis) and we’ve laid the groundwork for doing windows-over-groups.

Why DataFusion: DataFusion’s UDF capabilities offer a game-changing advantage in the world of data science. By enabling SQL users to tap into powerful ML pipelines, we bridge the gap between data manipulation and data analysis. This is a huge win for data scientists and data analysts who can now use their existing SQL skills to build and deploy ML pipelines. Moreover, DataFusion provides the building blocks and primitives to be able to utilize the optimizations without diving too deep into database internals or recreating the foundations for a new database. In LETSQL’s case, we are extending DataFusion to add ML focused UDFs and new DataTypes that are amenable to Tensor processing, necessary for performing ML.

The Power of SQL in Data Science: SQL’s declarative nature and widespread adoption make it an indispensable tool in the data science toolkit. Our efforts aim to simplify the integration of complex data processes into SQL, enhancing accessibility and efficiency across data workflows.

Future work: LETSQL

As we continue to refine our DataFrame API, future explorations will delve into end-to-end optimization of machine learning pipelines, leveraging the insights gained from this and our recent post on XGBoost scoring with UDFs. Stay tuned for our next installment, where we’ll unveil new advancements in making data science more accessible, efficient, and powerful than ever before.

Your thoughts and feedback are invaluable as we navigate this journey. Share your experiences, questions, or suggestions in the comments below or on our community forum. Together, let’s redefine the boundaries of data science and machine learning integration.

To subscribe to our newsletter, visit letsql.dev.

Footnotes

  1. XGBoost can give you deterministic results for the same input with seed control. One wrinkle is that DataFusion’s accumulation process results in a different ordering of the data than in pandas. To get the same results, we add a column, rownum that we can sort by to ensure the same XGBoost input order.↩︎

  2. Along the way, we also had to teach DataFusion how to build a struct with non-uniform field types and how to accept udfs with multiple column input↩︎