Using Apache Hudi & Iceberg tables in Databricks with Apache XTable
Metadata table formats such as Apache Hudi, Delta Lake, and Apache Iceberg have enabled an open lakehouse architecture by providing a foundation for storing data openly and bringing flexibility in terms of using the right compute for the right workload. These formats facilitate data storage in cloud data lakes like Amazon S3, Microsoft Azure Blob Storage, and Google Cloud Storage. By doing so, they allow organizations to maintain control over their data within secure environments.
While this paradigm brings solid advantages, an emerging pattern that we are seeing as more organizations productionize lakehouses is the difficulty in interoperating between these three table formats in their ecosystem. Organizations find themselves needing to frequently migrate or copy data to make it compatible across different formats, a process that is both costly and inefficient.
Launched last year, the Apache XTable (incubating) project is an open-source effort that targets interoperability among different lakehouse table formats. XTable is not a new table format but functions as a translation layer that facilitates the seamless conversion of metadata between various open table formats. This functionality is based on the premise that the three primary table formats — Apache Hudi, Delta Lake, and Apache Iceberg — utilize Parquet for data storage and share similarities in their metadata structures, which include details about partitions, schema, column-level statistics, row counts, and file sizes.
XTable reads a source table format’s metadata, capturing all this information, and translates it into the target table format, thereby allowing users to access their data in the preferred format and compute engine, regardless of the original table format in which the data was written. This eliminates the need for data rewriting or duplication.
In this hands-on blog, we will explore a simple example to demonstrate how we can use XTable to seamlessly interoperate among open table formats within the Databricks environment. We will use Databricks notebooks available as part of the community edition for this demonstration. The notebook can be downloaded from here.
Scenario
Let’s consider a team of data scientists using Databricks notebooks to develop their machine learning workflows. These notebooks provide an easy start with Apache Spark (their primary compute engine) and support real-time co-authoring, automatic versioning, and data visualization capabilities. Additionally, notebooks come with integrated MLFlow, enhancing the ability to track and run machine learning experiments. Typically, this team relies on a centralized data platform engineering team to supply data in Delta Lake table format, which is advantageous for machine learning experiments due to features like schema evolution, time travel, and data versioning. MLFlow automatically logs results for each experiment tied to a notebook, enabling the data scientists to efficiently track various experiments and manage the artifacts associated with them.
While this team primarily works with Delta Lake tables, there are certain experiments where they need to access datasets stored in other formats like Apache Hudi and Apache Iceberg by other teams to build robust ML models. Historically, the data platform team has either duplicated these datasets to rewrite them as Delta Lake or conducted one-off migrations. However, these methods have proven to be costly and time-consuming, often resulting in data staleness and unmanageable copies.
How does Apache XTable help?
To address this problem of interoperability between table formats, the data platform team plans to adopt Apache XTable. Their end goal is straightforward — users, in this case data scientists, should not be limited to only accessing Delta datasets but should have an easy way to read any table format data and have that abstraction without having to configure other formats or any other dependencies.
So, irrespective of whether the source table is Hudi, Delta Lake or Iceberg, users can just read the dataset as if it were a Delta Lake table using
spark.read.format(“delta”).load(“/mnt/mydata/<table_name>”)
Hands-on Example
Let’s say a team uses Apache Hudi as the lakehouse table format. Hudi shines in low-latency scenarios, such as streaming-based workloads. Its advantage is its support for incremental data processing and indexing, which enables quicker upserts and deletes in data lakes. The table is stored in an S3 data lake as churn_data
in the churn_hudi/
folder as shown in the image below.
Now, the other data science team wants to build a binary classification model using this dataset to analyze customer churn behavior. They also plan to run a series of experiments to identify the features that enhance their model robustness so they can deploy it in production. As mentioned earlier, this dataset, currently in a Hudi table format, is not accessible to them since they work exclusively with Delta Lake tables in Databricks notebooks. To bridge this gap, we will employ XTable to translate the metadata from Hudi to Delta Lake.
To begin using Apache XTable, start by cloning the GitHub repository to your local environment and compile the required jars with Maven. Execute the following command to initiate the build:
mvn clean package
For detailed installation instructions, refer to the official documentation.
Once the build is successful, we will use the utilities-0.1.0-SNAPSHOT-bundled.jar
to start the metadata translation process.
Next, create a configuration file named my_config.yaml
in the cloned XTable directory. This file should define the details for the translation, structured as follows:
sourceFormat: HUDI
targetFormats:
- DELTA
datasets:
- tableBasePath: s3://diplakehouse/churn_hudi/
tableName: churn_data
This configuration specifies the source format (Hudi), the target format (Delta Lake), and the dataset details such as the base path and table name on S3.
To initiate the translation process, execute this command:
java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
Upon successful completion of the sync process, we can see the output as shown below.
Let’s quickly check the S3 data lake now.
We now have the Delta metadata file, called the transaction log
under _delta_log/
. This files contain important information, including schema definitions, commit history, partitioning details, and column statistics.
Here’s a snippet of what we see in the transaction log regarding the file-level statistics.
Now that we have the source Hudi table metadata translated to Delta Lake, we can go to the Databricks Notebook environment and start working on the model.
First, let’s mount the S3 data lake to DBGS (Databricks File System). This allows users to interact with files stored remotely as if they were local to the Databricks environment and is particularly useful for accessing large datasets stored in cloud storage without having to duplicate data into the Databricks environment.
dbutils.fs.mount(
source = "s3a://diplakehouse",
mount_point = "/mnt/mydatanew",
extra_configs = {"fs.s3a.access.key": "<access-key>", "fs.s3a.secret.key": "<secret-key>"}
)
If we check the contents of our data lake, this is what we have.
The table that is of our interest is the Hudi table in the directory churn_hudi
. Since this table is now translated by XTable, we can read it as a regular Delta table.
delta_table = spark.read.format("delta").load("/mnt/mydata/churn_hudi")
To make this table available to a metastore, let’s register the table. Databricks provides a built-in Hive metastore that comes integrated with Spark.
spark.sql("""
CREATE TABLE churn_hudi
USING DELTA
LOCATION '/mnt/mydata/churn_hudi'
""")
If we now query the table, we can see the Hudi metafields along with the dataset’s records.
Now, for this particular experiment, we will drop some of the features and train our model to see how accuracy improves. From our exploratory data analysis and feature selection efforts, we identified that certain features are highly correlated while others are non-essential.
ALTER TABLE churn_hudi DROP COLUMN State, Area_code, Total_day_calls, Total_day_minutes, Total_day_charge, Total_eve_calls, Total_eve_minutes, Total_eve_charge, Total_night_calls, Total_night_minutes, Total_night_charge
To see the Delta table history, we can just query using the DESCRIBE HISTORY
command. This will present all the transactions that have happened on this table.
This can be extremely useful for doing time travel operations and training ML models on specific versions of the dataset. For example, training a new model on the original unaltered dataset.
Now, let’s train the churn classification model using the new dataset (with only the important features).
df_telco_new = spark.read.table("default.churn_hudi").toPandas()
# dummy categorical data
df_telco_new['International_plan']=df_telco_new['International_plan'].replace(['No','Yes'],[0,1])
df_telco_new['Voicemail_plan']=df_telco_new['Voicemail_plan'].replace(['No','Yes'],[0,1])
df_telco_new['Churn']=df_telco_new['Churn'].replace(['FALSE', 'TRUE'],[0,1])
#prepare data
target = df_telco_new.iloc[: , -1].values
features = df_telco_new.iloc[: , : -1].values
target.reshape(-1,1)
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.20,random_state=101)
import mlflow.sklearn
mlflow.sklearn.autolog()
with mlflow.start_run():
# Set the model parameters.
n_estimators = 600
# Create and train model.
rfc = RandomForestClassifier(n_estimators=600)
rfc.fit(X_train,y_train)
predictions = rfc.predict(X_test)
acc = accuracy_score(y_test, predictions)
classReport = classification_report(y_test, predictions)
confMatrix = confusion_matrix(y_test, predictions)
print(); print('Evaluation of the trained model: ')
print(); print('Accuracy : ', acc)
print(); print('Confusion Matrix :\n', confMatrix)
print(); print('Classification Report :\n',classReport)
Since we have MLFlow enabled already for our experiment, all of the model artifacts and information can now be viewed from the ‘Experiments’ tab.
Here is a quick overview of the experiment.
We can also analyze the important metrics of the model such as accuracy on test set, recall, log loss etc. in the ‘Model Metrics’ view.
In this blog, we demonstrated a practical example of how Databricks users can use XTable to interoperate between Apache Hudi, Iceberg, and Delta Lake formats and build ML workflows. Previously restricted to only Delta tables, the data science team can now swiftly switch between the three table formats within their Databricks environment. This interoperability not only simplifies analytical workflows by centralizing metadata management in a single S3 data lake but also reduces costs and ensures data is accessible across formats.
Join us for an in-depth exploration of Apache XTable at the Data+AI Summit 2024 on June 13th, 2024. We’ll delve into the capabilities and architecture of XTable, providing a comprehensive technical overview.
Resources
- Apache XTable website: https://xtable.apache.org/
- Apache XTable repo: https://github.com/apache/incubator-xtable
- What is Apache XTable Blog: https://dipankar-tnt.medium.com/onetable-interoperability-for-apache-hudi-iceberg-delta-lake-bb8b27dd288d