By Hussain Sultan | June 9, 2025
Often, real-world data workflows don’t fit neatly into SQL or standard relational operations. You need custom transformations, complex aggregations, multi-step logic, and arbitrary processing—but current tools force you to fragment these workflows across teams and systems. Team A builds processed tables, Team B builds models, Team C runs inference—each step opaque to the others, making end-to-end reasoning difficult.
In this article, we’ll present a solution that simplifies data pipeline use cases without building pipelines at all. Instead, we use the open source Xorq framework to define a composite data engine leveraging four different UDF expression types that let you model any real-world data pattern as a composable expression: row-wise transformations (UDF), aggregations (UDAF), and new user-defined exchange functions (UDXFs), which can input or output any shape schema, and subsume any UDF type to compose workflows that perform arbitrary logic on your data. Instead of breaking workflows apart, you can express complete business logic as single, coherent expression.
The Xorq framework is a Python framework for building composite data engines. Powered by Ibis, Apache DataFusion, and Apache Arrow, it provides a declarative syntax for defining custom query engines capable of applying any processing to any data, regardless of its source. The approach greatly simplifies most data science and AI data engineering use cases.
The Xorq framework supports four UDF types that can be used to model any real-world data workflow as pure expressions. Whether you’re doing custom transformations, complex aggregations, or chaining multi-step business logic, these UDF primitives let you express complete workflows without fragmenting across tools or teams. Every expression carries an explicit schema—the contract that lets the optimizer push work to the best engine and makes the entire workflow transparent to any team member.
If you caught Sev Leonard’s PyCon US talk “Schemas All the Way Down”, you already know the punchline: well-defined schemas turn messy data plumbing into a composable, predictable system. Xorq bakes that principle in from the first line of code.
Let’s illustrate the power of composable data processing by walking through an ML training and inference scenario that leverages multiple UDF types, including a new type, the User-Defined Exchange Function (UDXF). Throughout the example, we’ll work with a lending club dataset containing loan applications with features like:
import pandas as pd
import xorq as xo
import xorq.expr.datatypes as dt
loans = xo.deferred_read_parquet(
xo.connect(),
xo.config.options.pins.get_path("lending-club"),
)Let’s see how different UDF types help us analyze and model this data.
Scalar UDFs are the simplest type of user-defined function—they operate on individual rows, taking one or more column values as input and returning a single value. Think of them as the equivalent of applying a Python function to each row of a DataFrame.
When to use Scalar UDFs:
In this example, we use the Xorq library and its declarative data processing syntax to define a UDF expression that calculates risk scores:
from xorq.expr.udf import make_pandas_udf
def calculate_risk_score(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate a numerical risk score."""
base_score = 100.0
# Start with base score for all rows
scores = pd.Series([base_score] * len(df), index=df.index)
# Apply income factors
scores.loc[df['annual_inc'] < 30000] *= 1.8
scores.loc[(df['annual_inc'] >= 30000) & (df['annual_inc'] < 50000)] *= 1.3
# Apply credit score factors
scores.loc[df['fico_range_high'] < 650] *= 1.6
scores.loc[(df['fico_range_high'] >= 650) & (df['fico_range_high'] < 700)] *= 1.2
return scores
risk_score_udf = make_pandas_udf(
calculate_risk_score,
schema=xo.schema({"annual_inc": "float64", "fico_range_high": "int64"}),
return_type=dt.float64,
name="risk_score",
)
enriched_loans = (
loans
.mutate(credit_score=risk_score_udf.on_expr)
)UDAFs take multiple rows as input and produce a single output value, similar to SQL’s SUM or AVG functions, but with arbitrary Python logic e.g. group-by apply case. They maintain state across all rows in a group and can implement sophisticated algorithms while still fitting naturally into declarative query plans.
UDAFs are perfect for outputting reductions like training an ML model where a N-row x M-col group produces a 1 row x 1 col output.
When to use UDAFs:
Here we build a UDAF expression that uses a UDAF to compute data on which to train an XGBoost model:
import numpy as np
import xgboost as xgb
import pickle
from xorq.expr.udf import agg
from xorq.common.utils.toolz_utils import curry
features = ("emp_length", "dti", "annual_inc", "loan_amnt", "fico_range_high", "cr_age_days")
target = "event_occurred"
@curry
def train_xgboost_model(df, features=features, target=target, seed=0):
"""Train an XGBoost model on loan data."""
param = {"max_depth": 4, "eta": 0.1, "objective": "binary:logistic", "seed": seed}
num_round = 50
X = df[list(features)].fillna(0)
y = df[target]
dtrain = xgb.DMatrix(X, y)
bst = xgb.train(param, dtrain, num_boost_round=num_round)
return bst
model_udaf = udf.agg.pandas_df(
fn=toolz.compose(pickle.dumps, train_xgboost_model),
schema=t[features + (target,)].schema(),
return_type=dt.binary,
name="model",
)
model_expr = (
enriched_loans
.agg([
model_udaf.on_expr(enriched_loans).name("model")
])
)Note: Xorq also supports the User-Defined Window Function (UDWF), which works similarly to a UDAF, but keeps state between windows or time (see example).
Here’s where things get really interesting. Expression Scalar UDFs (ExprScalarUDF) allow you to define UDFs that are derived from other expressions. This means you can declaratively express a UDF that depends on the result of a UDAF: a single expression can contain the train/test split, model training and inference.
This composability is what makes composite query engines created with Xorq truly powerful: instead of breaking your logic into separate steps with intermediate storage, you can express complex workflows as single, coherent expressions.
When to use ExprScalar UDFs:
Here’s a practical example that applies our XGBoost trained model from above for predictions:
from xorq.expr.udf import make_pandas_expr_udf
@curry
def predict_xgboost_model(model, df, features=features):
return model.predict(xgb.DMatrix(df[list(features)]))
# Load data and create train/test splits
t = xo.deferred_read_parquet(
xo.connect(), xo.config.options.pins.get_path("lending-club")
)
(train, test) = xo.train_test_splits(
t,
unique_key="rownum",
test_sizes=0.7,
random_seed=42,
)
# Step 2: ExprScalar UDF that takes the trained model as a computed parameter
predict_expr_udf = make_pandas_expr_udf(
computed_kwargs_expr=model_udaf.on_expr(train), # This is the magic!
fn=predict_xgboost_model,
schema=t[features].schema(),
return_type=dt.dtype("float32"),
name="predicted",
)
# Step 3: Apply predictions to test data using the trained model
expr = test.mutate(predict_expr_udf.on_expr(test).name("predicted"))
# Execute the entire pipeline
result = expr.execute()Finally, we propose a fourth UDF type, made possible with the Xorq framework: UDXFs can accept any shape schema as input and return any shape schema as output. Xorq leverages Apache Arrow Flight Server to communicate data with clients. UDXFs are a declarative escape hatch necessary to compose relational pipelines with real-world applications—the only things that we need to know are the schema (or the shape of the data) going in and coming out. Furthermore, UDXFs are special as they are executed in a light-weight Flight Server which does not have to be in-process. Hence, the flight server makes the UDXFs portable.
import pandas as pd
import xorq as xo
from random import Random
def do_risk_scenarios(df: pd.DataFrame):
# this could be an API call or something else impure
return pd.DataFrame(
(
{
"loan_id": tpl.rownum,
"scenario_id": scenario_id,
"scenario_type": scenario_type,
"adjusted_score": tpl.credit_score * (factor + Random(tpl.rownum + scenario_id).uniform(-0.1, 0.1)),
}
for tpl in df.itertuples()
for (scenario_id, scenario_type, factor) in [
(0, "base", 1.0),
(1, "stress", 1.5),
(2, "optimistic", 0.8),
]
)
)
schema_in = xo.schema({"rownum": "int64", "credit_score": "float64"})
schema_out = xo.schema({
"loan_id": "int64",
"scenario_id": "int64",
"scenario_type": "string",
"adjusted_score": "float64",
})
risk_scenarios_udxf = xo.expr.relations.flight_udxf(
process_df=do_risk_scenarios,
maybe_schema_in=schema_in.to_pyarrow(),
maybe_schema_out=schema_out.to_pyarrow(),
name="risk_scenarios",
)
# Apply to our enriched loans data
risk_scenarios = (
enriched_loans
.select(["rownum", "credit_score"])
.pipe(risk_scenarios_udxf)
)
risk_scenarios.execute()Comparison with User-Defined Table Functions (UDTFs):
UDXFs are similar to UDTFs with stateful processing, where output could change based on the complete input of a batch not just row-wise. However, they are different in the sense that they also describe a Flight service and enable portability. In that sense, it is closer to External Functions.
| Feature | Scalar UDF | UDAF | UDWF | UDXF |
|---|---|---|---|---|
| Processing Pattern | Row by row | Aggregation | Window | Arbitrary transformation |
| State Management | Stateless | Aggregation state | Window state | Arbitrary state |
| Schema | 1 row x m cols in / 1 row x 1 col out | n rows x m cols in / 1 row x 1 col out | n rows x m cols in / 1 row x 1 col out (once per row) | n rows x m cols in / y rows x z cols out |
| Execution Context | In-process | In-process | In-process | Arrow Flight service (potentially remote) |
| Best for | Inference / Functions | ML Training | Time-series analysis | Compute offloading, complex logic, arbitrary output shapes |
Here’s a quick decision tree to help you choose the right UDF type for your use case:
The key distinction is that UDXFs are generic functions that can implement the behavior of any other function type, with the added flexibility of arbitrary data transformations and potential execution outside Xorq.
The lending club example shows how you can start with basic row-wise operations and progressively build more sophisticated analytics—all while maintaining the declarative nature that makes pipelines easy to reason about, test, and optimize.