ML Pipelines 
From Another Planet

Do-anything, run-anywhere Python UDFs that simplify and accelerate ML workflows and development.

ML Pipelines? 
More like ML Wormholes

Multi-engine ML pipelines made simple

No more time-consuming ML pipeline development. xorq’s multi-engine system seamlessly moves data between query engines, allowing you to leverage the strengths of each engine within a unified workflow.

  • Orchestrate flows via a simple declarative Pythonic scripting language
  • Fast, transparent data movement between engines
  • Batching, memory management handled for you
  • No serialization/deserialization overhead
  • Supports Snowflake, Trino, Pandas, DuckDB, Postgres, and many others
# Connect to different engines
pg = xo.postgres.connect_env()
db = xo.duckdb.connect()

# Get tables from different sources
batting = pg.table("batting")

# Load awards_players into DuckDB
awards_players = xo.examples.awards_players.fetch(backend=db)

# Filter data in respective engines
left = batting.filter(batting.yearID == 2015)
right = awards_players.filter(awards_players.lgID == "NL").drop("yearID", "lgID")

# Move right table into postgres for efficient join
expr = left.join(
    into_backend(right, pg),
    ["playerID"],
    how="semi"
)[["yearID", "stint"]]

# Execute the multi-engine query
result = expr.execute()

Cacheing & in-memory performance for fast iteration

Built-in performance optimizations such as caching and in-memory data transfer ensure fast execution of ML data flows, and even faster iteration as you develop and test.

  • In-memory, zero-copy data transfer across engines
  • Cache results from upstream query engines
  • Automatically invalidate cache when source data changes
  • Chain caches across multiple engines
# Connect to source database
pg = xo.postgres.connect_env()
con = xo.connect()  # empty connection

# Create source storage
storage = SourceStorage(source=con)

# Register table from postgres and cache it
batting = pg.table("batting")

# Cache the filtered data in the source backend
cached = (
    batting.filter(batting.yearID == 2015)
    .cache(storage=storage)  # cache expression
)

# Execute the query - results will be cached
result = xo.execute(cached)

Build powerful, portable UD(x)Fs

xorq provides the escape velocity you need to avoid the functional and performance limitations (and cost) of executing UDFs within your native query engines. Gain powerful data processing capabilities and run xorq UD(x)Fs on any platform.

  • Scalar UDFs with model integration
  • UDAF aggregation
  • UDWF windowing
  • Supports Snowflake, Trino, Pandas, DuckDB, and many others
import xorq as xo
from xorq.expr.ml import make_quickgrove_udf
from pathlib import Path
from xorq import _

t = xo.examples.diamonds.fetch()

model_path = Path(xo.options.pins.get_path("diamonds-model"))
model = make_quickgrove_udf(model_path, model_name="diamonds_model")
expr = t.mutate(pred=model.on_expr).filter(_.carat < 1).select(_.pred).execute()

ML pipelines as expressions

Compose end-to-end machine learning pipelines - from data fetching through prediction - into single, executable expressions. xorq handles the execution details, optimization, and cacheing of intermediate results.

  • Update the pipeline with new data
  • Modify individual steps without rewriting the whole pipeline
  • Cache and reuse expensive computations
  • Execute different parts of the pipeline on different engines
...

# Create composite expression out of other xorq UDFs
expr = transformed_test_data.mutate(
    predict_expr_udf.on_expr(transformed_test_data).name(prediction_key)
)


expr.execute()

Simply Powerful Multi-Engine Scripting

Use Case Examples

Free xorq Training

Spend 30 minutes with xorq engineering to get on the fast path to better ML engineering.

Schedule Free Training