A feature orchestration system, commonly known as a feature pipeline, coordinates the calculation of specific features before writing them into various feature groups. The most common feature pipelines are: batch feature pipelines and streaming feature pipelines.
Streaming feature pipelines consume dynamic data that is generated on a continual basis. Hopsworks provides tools and interfaces that enables such applications to execute high throughput and low latency feature computations. This also enables clients to retrieve real-time, precalculated features with millisecond level latency.
When the demand is for up-to-the-second or even millisecond precomputed features, a streaming feature pipeline is capable of creating features from events in your messaging system that are just a few milliseconds or seconds old. Below we showcase a streaming feature pipeline employing Apache Beam, a per-event data processing framework.
Apache Beam is a powerful and flexible unified programming model for batch and streaming data processing pipelines. Beam feature pipelines for Hopsworks Feature store are supported in Java/Scala. Hopsworks Feature Store expects that your computed features are encapsulated in org.apache.beam.sdk.values.Row class and that it has the same schema as the feature group you are writing into. Streaming APIs are expected to be enabled for the feature group in order to write computed features from Beam.
If above requirements are satisfied any operation available in Apache Beam is supported to compute and write them to Hopsworks feature group. For example, imagine you want to compute New York City Taxi Rides features. The schema for your feature group could look as follows:
root
|-- ride_id: string (nullable = true)
|-- ride_status: string (nullable = true)
|-- point_idx: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- meter_reading: double (nullable = true)
|-- meter_increment: double (nullable = true)
|-- passenger_count: integer (nullable = true)
Hopsworks expects your features to be encapsulated in the following:
org.apache.beam.sdk.values.Row class
Schema schema =
Schema.of(
Field.nullable("ride_id", FieldType.STRING),
Field.nullable("ride_status", FieldType.STRING),
Field.nullable("point_idx", FieldType.INT32),
Field.nullable("longitude", FieldType.DOUBLE),
Field.nullable("latitude", FieldType.DOUBLE),
Field.nullable("meter_reading", FieldType.DOUBLE),
Field.nullable("meter_increment", FieldType.DOUBLE),
Field.nullable("passenger_count", FieldType.INT32)
);
To write real time features to the feature group you need to first get a reference to the feature group object (note that currently Hopsworks doesn’t support registering feature group metadata from the Beam application. Thus it needs to be pre-registered before you can write real time features computed by Beam):
//get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group
StreamFeatureGroup featureGroup = fs.getStreamFeatureGroup(featureGroupName, featureGroupVersion);
Define Beam pipeline and logic to compute features
// define pipeline and feature computation engine
Pipeline p = Pipeline.create();
.apply("ReadFromPubSub",PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(schema))
.apply("insert to stream feature group", featureGroup.insertStream());
Apache Beam real-time feature pipelines will produce a continuous stream of features and Hopsworks will sync it to Online Feature Stores. The Online Feature Store saves the latest values per primary key(s). In order to save historical data for batch data analysis or model training you need to start an offline materialization job. In Hopsworks each stream enabled feature group has a dedicated offline materialization job named as:
[FEATURE_GROUP_NAME]_[FEATURE_GROUP_VERSION]_offline_fg_materialization.
You can start this job from the Feature Store UI or schedule it for periodic synchronization with the offline store.