Python is the language of AI, but Python is a 2nd class citizen in existing lakehouses. This article describes Hopsworks Query Service, built on Apache Arrow, that reads data from the Lakehouse to Python clients at very high speed compared to existing lakehouse that bolt-on JDBC/ODBC access for their Python clients. We describe the results of open-source benchmarks, published in the peer-reviewed industry-leading SIGMOD 2024 conference, showing how Hopsworks can query lakehouse tables, returning Pandas DataFrames, at between 10-40X the throughput of Databricks, BigQuery, and Sagemaker. The Hopsworks Query Service provides a fast feedback loop for Data Scientists to work with Lakehouse data, and is one of the building blocks of the Hopsworks AI lakehouse.
This article is part 5 in a 7 part series describing in lay terms concepts and results from a SIGMOD 2024 research paper on the Hopsworks Feature Store.
Other Parts: 1 (Modular/Composable AI Systems) , 2 (The Taxonomy for Data Transformations), 3 (Use all features: Snowflake Schema) , 4 (Lakehouse for AI), 6 (Real-Time AI Database), 7 (Reproducible Data).
Hopsworks enables a Data Scientist, without deep SQL or data modeling skills, to create, query, join, and maintain tables in a lakehouse and then run temporal queries to read point-in-time correct data for model training and inference. Underpinning these capabilities is a dedicated query engine, powered by Arrow, that enables Data Scientists to iterate faster when working with Lakehouse tables - Apache Iceberg, Delta Lake, and Apache Hudi. This makes your Data Scientists both happier and more productive when working with Lakehouse data.
But why do we need a dedicated query engine for Lakehouse tables? In a previous article, we explained why temporal joins are required to read data for both training and inference (to prevent future data leakage or stale data). Specifically, these joins are ASOF LEFT JOIN queries and the clients of the queries are Python programs that use query results to train models and perform batch inference for analytical models, see figure 2. Apache Spark, the workhouse of the Lakehouse, doesn’t support ASOF LEFT JOINs, and even when you implement ASOF LEFT JOINs in Spark with state tables, it’s performance is not good. For this reason, we developed a Spark Query Operator to support temporal joins in Spark, which improves performance but it still can’t return training data or batch inference data directly to Python clients - it materializes training/inference data as files that clients have to read.
For these reasons, we developed a dedicated query service for lakehouse tables that returns Arrow data to Python clients (without the need for materialization as files), provides highly optimized ASOF LEFT JOINs, and works natively with Arrow data from the lakehouse tables to the query engine to the client. It even supports federated queries across external tables in Snowflake and BigQuery. Existing lakehouses, however, only provide JDBC/ODBC query support for Python clients for querying data (as of the time of our research paper). The performance difference is an order of magnitude or more, demonstrating the need for dedicated services for AI for the Lakehouse. We need an AI Lakehouse.
Data scientists want to use Python to read data from the lakehouse tables. That is not a high performance capability of existing data warehouses, that (as of mid-2024), provide mostly JDBC/ODBC APIs for reading to Python clients. Data from the lakehouse will first be read by the JDBC/ODBC server, then transposed from columnar to row format, then serialized, then sent to the Pandas/Polars client that will then deserialize and transpose the data back from row-oriented to columnar format. The whole process introduces large bottlenecks that make using a data warehouse directly from Python often a painful process, negatively affecting the productivity and happiness of Data Scientists.
The Hopsworks Query service is built to support high performance querying of feature data stored in a lakehouse as well as the high performance creation of training data as files. When the Python client issues an API call, it is transpiled, with the help of Apache Calcite, to a SQL query to our query engine built on DuckDB.
Hopsworks Query Service can also choose to use Spark to perform the query, depending on the query at hand. For queries on data that fits on a single machine, DuckDB is the engine of choice that, although it supports out-of-core processing, does not scale to run on multiple servers. The Spark-based query engine can be used to save training data as files on any user-provided cloud bucket or path to a file system. The Spark engine scales to create training data of many TBs in size. Both the DuckDB and Spark engines perform the temporal joins to create training data. DuckDB natively supports LEFT ASOF JOINs, while our Spark engine implements the LEFT ASOF JOIN using state tables.
In figure 3, we can see how the Hopsworks Query Service provides an API for Python clients and receives data via Arrow Flight protocol, avoiding any unnecessary serialization/deserialization and row-to-column or column-to-row transpositions of the data.
The following are benchmarks from our peer-reviewed paper at SIGMOD Industrial 2024. That means, they are not a typical vendor benchmark, but realistic benchmarks reviewed by our peers in industry and research.
In figure 4, we can see the performance of Hopsworks Query service reading data from Hudi tables as Arrow data, performing temporal joins using DuckDB, before returning data to clients using ArrowFlight protocol. In contrast, other Lakehouses (as of paper publication) provided JDBC/ODBC APIs allowing Python clients read training data as DataFrames, requiring pivoting of the data from columnar to row format and back again, as well as serialization and deserialization of the data for over the network traffic. The result is that Hopsworks has over an order of magnitude higher performance than competing feature stores.
Hopsworks Feature Store provides a pluggable offline store, and you can store feature groups in one of the many supported data warehouses as an offline store - on a per table basis. For example, you could have 50 tables in Hopsworks offline store, 30 tables in Snowflake, and 20 tables in Big Query. Hopsworks treats all of these as feature groups and supports generating training data and inference data from them using federated query support. The Hopsworks Query Service supports federated queries as an efficient and performant way to join features from different offline stores, such as Snowflake, BigQuery, and Hopsworks (without data duplication). As more data warehouses add Arrow native query APIs, such as ADBC, federated queries will become more performant.
In this article, we covered how Hopsworks generates temporal queries from Python code, and how a native query engine built on Arrow can massively outperform JDBC/ODBC APIs provided by existing the lakehouses. Data scientists need to use Lakehouse data for both training and inference, and we have built native support for their needs in the Hopsworks AI Lakehouse.