Hopsworks Product Capabilities

Feature Engineering in Flink

A feature pipeline is a program that orchestrates the computation of features that are subsequently written to one or more feature groups. Generally there are 2 types of feature pipelines: batch feature pipelines that are run on a schedule and streaming feature pipelines that run continuously. 

A streaming feature pipeline is a program that continuously processes incoming data, computing features, and writing those features to a feature store. Hopsworks provides APIs to enable stream processing applications to write with low-latency and high throughput, and clients to read fresh (real-time precomputed) features with low-latency. 

If you need very fresh precomputed features in your feature store, a streaming feature pipeline will create features from events in your message bus that are only a second or so old. In this post we will describe a streaming feature pipeline using the per-event stream processing framework, Apache Flink.

Apache Flink is a powerful and flexible framework for stateful feature computation operations over unbounded and bounded data streams. Flink is the open-source framework of choice for feature engineering when you need very fresh features computed in real-time. Flink provides a rich set of operators and functions such as time windows and aggregation operations that can be applied to individual data elements (keyed streams) or grouped data sets (global windows). Flink’s stateful operations enable it to maintain and update state across multiple data records or events, which is particularly useful for feature engineering tasks such as sessionization and/or maintaining rolling aggregates over a sliding window of data.

Flink feature pipelines for Hopsworks Feature store are supported in Java/Scala. Hopsworks Feature Store expects that computed features are encapsulated in POJO class that has the same schema as the feature group you are writing into. In database terms this POJO class corresponds to one row. Streaming APIs are expected to be enabled for the feature group in order to write computed features from Flink. 

If above requirements are satisfied any operation available in Apache Flink is supported to compute and write them to Hopsworks feature group. For example, imagine you want to compute aggregations over a 10 minute window such as the number of transactions and the average and standard deviation of amount transacted per credit card. The schema for your feature group could look as follows:

 root
    |-- cc_num: long (nullable = true)
    |-- num_trans_per_10m: long (nullable = true)
    |-- avg_amt_per_10m: double (nullable = true)
    |-- stdev_amt_per_10m: double (nullable = true)

Hopsworks expects your features to be encapsulated in the following POJO class:

 public class CreditCardFg {
  Long cc_num;
  Long num_trans_per_10m;
  Double avg_amt_per_10m;
  Double stdev_amt_per_10m;

  public Long getCcNum() {
    return cc_num;
  }
  public void setCcNum(Long value) {
    this.cc_num = value;
  }
  public Long getNumTransPer10m() {
    return num_trans_per_10m;
  }
  public void setNumTransPer10m(Long value) {
    this.num_trans_per_10m = value;
  }
  public Double getAvgAmtPer10m() {
    return avg_amt_per_10m;
  }
  public void setAvgAmtPer10m(Double value) {
    this.avg_amt_per_10m = value;
  }
  public Double getStdevAmtPer10m() {
    return stdev_amt_per_10m;
  }
  public void setStdevAmtPer10m(Double value) {
    this.stdev_amt_per_10m = value;
  }
}

To write real time features to the feature group you need to first get a reference to the feature group object (note that currently Hopsworks doesn’t support registering feature group metadata from Flink application. Thus it needs to be pre-registered before you can write real time features computed by Flink):

 //get reference to feature store
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
    
// get reference to feature group
StreamFeatureGroup featureGroup = fs.getStreamFeatureGroup(featureGroupName, featureGroupVersion);

Then, you write the feature engineering logic to compute your near real-time features:

 // aggregate stream and return DataStream
DataStream computedFeatures = ...

Finally, you write your features to the feature group:

 // insert stream
featureGroup.insertStream(computedFeatures);

Apache Flink real-time feature pipelines will produce a continuous stream of features and Hopsworks will sync it to Online Feature Store. The Online Feature Store saves the latest values per primary key(s). In order to save historical data for batch data analysis or model training you need to start an offline materialization job. In Hopsworks each stream enabled feature group has a dedicated offline materialization job named as:

[FEATURE_GROUP_NAME]_[FEATURE_GROUP_VERSION]_offline_fg_materialization.

You can start this job from the Feature Store UI or schedule it for periodic synchronization with the offline store. 

Other Hopsworks Capabilities you might find interesting