Hudi-rs with DuckDB, Polars, Daft, DataFusion — Single-node Lakehouse
The lakehouse architecture is gaining significant attention due to its promised flexibility and openness, thanks to metadata table formats like Apache Hudi, Apache Iceberg, and Delta Lake. These formats enable a flexible, open data architecture that allows enterprises to choose the most suitable compute engines for their specific needs, freeing them from the limitations of proprietary warehouse solutions.
One important aspect of current lakehouse platforms is their predominantly distributed nature, designed to handle large-scale, complex data workloads. This typically requires setting up clusters with Spark, JVM, and other configurations to manage data in underlying storage systems. However, there is a growing need to query data directly from lakehouse formats in a single-node architecture for ad hoc queries, building analytical apps and other related use cases.
What is Hudi-rs?
In this blog, we will explore Hudi-rs, a native Rust library for Apache Hudi with Python bindings. This project opens up new possibilities for working with Hudi across a range of use cases that typically don’t require distributed processing.
For example, data science and machine learning applications often need frequent access to diverse datasets for exploratory data analysis and model training. Direct access to Hudi datasets for these use cases reduces the ‘wait time’ for data stakeholders, ultimately speeding up the time-to-insight process. Traditionally, accessing these tables involves configuring Java, Spark, Hadoop, and other related dependencies. This is what Hudi-rs addresses and makes it easy for data consumers to get started.
Here are some of the highlights of the Hudi-rs 0.1.0 release:
- Copy-on-Write (CoW) table snapshots and time-travel reads
- Rust and Python APIs for retrieving table schemas and reading tables
into Arrow RecordBatch - Integration with Apache DataFusion
- Support for AWS, Azure, and GCP cloud storage schemes
Hudi-rs in Action 🚀
Before we try out Hudi-rs, let’s take a moment to understand its internals, which will provide context for its integration with the downstream libraries. Our focus will be on the Python ecosystem here.
Hudi-rs uses Apache Arrow under the hood, which makes it easily compatible with other Arrow-native libraries and tools. Apache Arrow is a columnar in-memory data representation that can be utilized by any processing engine. Arrow is neither a storage nor an execution engine; it serves as a language-agnostic standard for in-memory processing and data transport. This means that when two systems “speak” Arrow, there’s no need for additional serialization and deserialization of data during transport, enabling seamless interoperability and reducing costs.
Hudi-rs scans the records from a Hudi table sitting in a local storage or any cloud object store (such as S3) and puts them out into Arrow RecordBatches.
A record batch is a collection of equal-length arrays that conform to a specific schema. It is a table-like data structure, essentially a sequence of fields, where each field is a contiguous Arrow array.
Once the records are in a RecordBatch, you can use them to construct an Arrow Table. Finally, having the data in an Arrow Table makes it super easy to be consumed by downstream libraries or tools that have an integration with Arrow.
Alright, now let’s see some of this in action.
DuckDB
DuckDB needs no introduction! It is simple, easy to get started with and has a feature-rich SQL dialect to deal with analytical workloads. Plus there is a Python client.
DuckDB can query different type of Apache Arrow objects, including Arrow Tables, Datasets, Scanners, and RecordBatchReaders. This allows users to treat Arrow data structures as regular tables within DuckDB, enabling seamless data querying and integration without additional serialization or deserialization steps.
Here is how you can query a Hudi table using DuckDB and Hudi-rs. First, let’s install hudi
, duckdb
& import required packages such as pyarrow
to scan the records from the Hudi table and write it as an Arrow Table.
pip install hudi
pip install duckdb
from hudi import HudiTable
import pyarrow as pa
import pyarrow.compute as pc
hudi_table_cloud = HudiTable("s3://mybucket/lakehouse/hudidata/")
records_cloud = hudi_table_cloud.read_snapshot()
arrow_table = pa.Table.from_batches(records_cloud)
Now that we have the Hudi table records as an Arrow table, we can use DuckDB to query them, as if they are regular tables within DuckDB.
import duckdb
con = duckdb.connect()
duck_results = con.execute("SELECT * FROM arrow_table").arrow()
As simple as that!
Here are the results (I used DuckDB’s Python client inside Jupyter).
You can then bring any visualization tool or related libraries to carry on with the downstream analytical tasks.
Polars
Polars is a DataFrame library, written in Rust, with an intuitive query optimizer that allows vectorized querying for optimal performance. It also brings in lazy evaluation that allows us to build a query plan without executing it immediately.
Polars provides seamless integration with Apache Arrow, enabling the creation of a Polars DataFrame directly from Arrow tables. This integration is provided through the polars.from_arrow
function, which accepts Arrow tables, record batches, and arrays.
For our use case, since we already have the Hudi records as an Arrow table arrow_table
, let’s use this method to query the data.
import polars as pl
pl.from_arrow(arrow_table)
Here is the output.
Daft
Daft has quickly become one of my go-to DataFrame libraries. It is a distributed query engine that offers a familiar Python DataFrame API, designed for tasks such as ETL, analytics, and machine learning at scale. It operates both locally and on distributed clusters.
Daft actually already has a direct integration with Apache Hudi (via pyhudi
) making it one of the first Python DataFrame libraries to enable this functionality. You can read more about the integration and see a hands-on use case through the link below.
However, now that we have hudi-rs
, we will replace the pyhudi
module in daft with the hudi
dependency.
For the purpose of this blog, let’s explore how we can consume the Arrow Table object to create a Daft dataframe.
import daft
daft_df = daft.from_arrow(arrow_table)
Here is the output dataframe.
Apache DataFusion [Rust]
Apache DataFusion is a fast, extensible query engine for building high-quality data-centric systems in Rust, using Apache Arrow. As part of the very first release, hudi-rs
has already been integrated with DataFusion. This means you can read data directly from Hudi tables using its execution engine. Here is a sample code to do so.
use datafusion::prelude::SessionContext;
use hudi::HudiDataSource;
let ctx = SessionContext::new();
let hudi = HudiDataSource::new_with_options("/tmp/trips_table",
[("hoodie.read.as.of.timestamp", "20240718095200206")]).await?;
ctx.register_table("trips_table", Arc::new(hudi))?;
let df = ctx.sql("SELECT * from trips_table where fare > 20.0").await?;
df.show().await?;
In this blog, we’ve explored how to use hudi-rs
, a native Rust library for Apache Hudi, to work directly with lakehouse table formats without the dependency on JVM, Spark, or Hadoop. This unlocks new possibilities to use Hudi for a variety of use cases within the Python and Rust ecosystems.
If this piques your interest, join us in tackling some of the most critical challenges in the lakehouse space.
⭐️ Check out the repo (leave a star if you like it!)
🔗 Start Contributing — https://github.com/apache/hudi-rs/contribute
📱 Join our Slack Channel (#hudi-rs) — https://join.slack.com/t/apache-hudi/shared_invite/zt-2ggm1fub8-_yt4Reu9djwqqVRFC7X49g