Coder Social home page Coder Social logo

alibaba / feathub Goto Github PK

View Code? Open in Web Editor NEW
306.0 11.0 49.0 4.67 MB

FeatHub - A stream-batch unified feature store for real-time machine learning

License: Apache License 2.0

Dockerfile 0.07% Python 75.57% Shell 0.87% Java 23.49%
apache-flink data-engineering data-science feature-engineering feature-store machine-learning streaming data data-quality mlops

feathub's Introduction

FeatHub is a stream-batch unified feature store that simplifies feature development, deployment, monitoring, and sharing for machine learning applications.

Introduction

FeatHub is an open-source feature store designed to simplify the development and deployment of machine learning models. It supports feature ETL and provides an easy-to-use Python SDK that abstracts away the complexities of point-in-time correctness needed to avoid training-serving skew. With FeatHub, data scientists can speed up the feature deployment process and optimize feature ETL by automatically compiling declarative feature definitions into performant distributed ETL jobs using state-of-the-art computation engines of their choice, such as Flink or Spark.

Checkout Documentation for guidance on compute engines, connectors, expression language, and more.

Core Benefits

Similar to other feature stores, FeatHub provides the following core benefits:

  • Simplified feature development: The Pythonic FeatHub SDK makes it easy to develop features without worrying about point-in-time correctness. This helps to avoid training-serving skew, which can negatively impact the accuracy of machine learning models.
  • Faster feature deployment: FeatHub automatically compiles user-specified declarative feature definitions into performant distributed ETL jobs using state-of-the-art computation engines, such as Flink or Spark. This speeds up the feature deployment process and eliminates the need for data engineers to re-write Python programs into distributed stream or batch processing jobs.
  • Performant feature generation: FeatHub offers a range of built-in optimizations that leverage commonly observed feature ETL job patterns. These optimizations are automatically applied to ETL jobs compiled from the declarative feature definitions, much like how SQL optimizations are applied.
  • Facilitated feature sharing: FeatHub allows developers to register and query feature definitions in a persistent feature registry. This capability reduces the duplication of data engineering efforts and the resource cost of feature generation by allowing developers in the organization to share and re-use existing feature definitions and datasets.

In addition to the above benefits, FeatHub provides several architectural benefits compared to other feature stores, including:

  • Real-time feature generation: FeatHub supports real-time feature generation using Apache Flink as the stream computation engine with milli-second latency. This provides better performance than other open-source feature stores that only support feature generation using Apache Spark.

  • Assisted feature monitoring: FeatHub provides built-in metrics to monitor the quality of features and alert users to issues such as feature drift. This helps to improve the accuracy and reliability of machine learning models.

  • Stream-batch unified computation: FeatHub allows for consistent feature computation across offline, nearline, and online stacks using Apache Flink for real-time features with low latency, Apache Spark for offline features with high throughput, and FeatureService for computing features online when the request is received.

  • Extensible framework: FeatHub's Python SDK is decoupled from the APIs of the underlying computation engines, providing flexibility and avoiding lock-in. This allows for the support of additional computation engines in the future. For example, FeatHub supports Local Processor that is implemented using Pandas library, in addition to its support for Apache Flink and Apache Spark.

Usability is a crucial factor that sets feature store projects apart. Our SDK is designed to be Pythonic, declarative, intuitive, and highly expressive to support all the necessary feature transformations. We understand that a feature store's success depends on its usability as it directly affects developers' productivity. Check out the FeatHub SDK Highlights section below to learn more about the exceptional usability of our SDK.

What you can do with FeatHub

With FeatHub, you can:

  • Define new features: Define features as the result of applying expressions, aggregations, and cross-table joins on existing features, all with point-in-time correctness.
  • Read and write features data: Read and write feature data into a variety of offline, nearline, and online storage systems for both offline training and online serving.
  • Backfill features data: Process historical data with the given time range and/or keys to backfill feature data, whic
  • Run experiments: Run experiments on the local machine using LocalProcessor without connecting to Apache Flink or Apache Spark cluster. Then deploy the FeatHub program in a distributed Apache Flink or Apache Spark cluster by changing the program configuration.

Architecture Overview

The architecture of FeatHub and its key components are shown in the figure below.

The workflow of defining, computing, and serving features using FeatHub is illustrated in the figure below.

See Basic Concepts for more details about the key components in FeatHub.

Supported Compute Engines

FeatHub supports the following compute engines to execute feature ETL pipeline:

FeatHub SDK Highlights

The following examples demonstrate how to define a variety of features concisely using FeatHub SDK. See FeatHub SDK for more details.

See NYC Taxi Demo to learn more about how to define, generate and serve features using FeatHub SDK.

  • Define features via table joins with point-in-time correctness
f_price = Feature(
    name="price",
    transform=JoinTransform(
        table_name="price_update_events",
        feature_name="price"
    ),
    keys=["item_id"],
)
  • Define over-window aggregation features:
f_total_payment_last_two_minutes = Feature(
    name="total_payment_last_two_minutes",
    transform=OverWindowTransform(
        expr="item_count * price",
        agg_func="SUM",
        window_size=timedelta(minutes=2),
        group_by_keys=["user_id"]
    )
)
  • Define sliding-window aggregation features:
f_total_payment_last_two_minutes = Feature(
    name="total_payment_last_two_minutes",
    transform=SlidingWindowTransform(
        expr="item_count * price",
        agg_func="SUM",
        window_size=timedelta(minutes=2),
        step_size=timedelta(minutes=1),
        group_by_keys=["user_id"]
    )
)
  • Define features via built-in functions and the FeatHub expression language:
f_trip_time_duration = Feature(
    name="f_trip_time_duration",
    transform="UNIX_TIMESTAMP(taxi_dropoff_datetime) - UNIX_TIMESTAMP(taxi_pickup_datetime)",
)
  • Define a feature via Python UDF:
f_lower_case_name = Feature(
    name="lower_case_name",
    dtype=types.String,
    transform=PythonUdfTransform(lambda row: row["name"].lower()),
)

User Guide

Checkout Documentation for guidance on compute engines, connectors, expression language, and more.

Prerequisites

You need the following to run FeatHub installed using pip:

  • Unix-like operating system (e.g. Linux, Mac OS X)
  • Python 3.7/3.8/3.9

Install FeatHub Nightly Build

To install the nightly version of FeatHub and the corresponding extra requirements based on the compute engine you plan to use, run one of the following commands:

# Run the following command if you plan to run FeatHub using a local process
$ python -m pip install --upgrade feathub-nightly

# Run the following command if you plan to use Apache Flink cluster
$ python -m pip install --upgrade "feathub-nightly[flink]"

# Run the following command if you plan to use Apache Spark cluster, or to use
# Spark-supported storage in a local process. 
$ python -m pip install --upgrade "feathub-nightly[spark]"

Quickstart

Quickstart using Local Processor

Execute the following command to compute features defined in nyc_taxi.py in the given Python process.

$ python python/feathub/examples/nyc_taxi.py

Quickstart using Flink Processor

You can use the following quickstart guides to compute features in a Flink cluster with different deployment modes:

Quickstart using Spark Processor

You can use the following quickstart guides to compute features in a standalone Spark cluster.

Examples

The following examples can be run on Google Colab.

Name Description
NYC Taxi Demo Quickstart notebook that demonstrates how to define, extract, transform and materialize features with NYC taxi-fare prediction sample data.
Feature Embedding Demo FeatHub UDF example showing how to define and use feature embedding with a pre-trained Transformer model and hotel review sample data.
Fraud Detection Demo An example to demonstrate usage with multiple data sources such as user account and transaction data.

Examples in this this repo can be run using docker-compose.

Developer Guide

Prerequisites

You need the following to build FeatHub from source:

  • Unix-like operating system (e.g. Linux, Mac OS X)
  • x86_64 architecture
  • Python 3.7/3.8/3.9
  • Java 8
  • Maven >= 3.1.1

Install Development Dependencies

  1. Install the required Python libraries.
$ python -m pip install -r python/dev-requirements.txt
  1. Start docker engine and pull the required images.
$ docker image pull redis:latest
$ docker image pull confluentinc/cp-kafka:5.4.3
  1. Increase open file limit to be at least 1024.
$ ulimit -n 1024

Build and Install FeatHub from Source

$ mvn clean package -DskipTests -f ./java
$ python -m pip install "./python[flink]"
$ python -m pip install "./python[spark]"

Run Tests

Please execute the following commands under Feathub's root folder to run tests.

$ mvn clean package -f ./java
$ pytest --tb=line -W ignore::DeprecationWarning ./python

While the commands above cover most of Feathub's tests, some FlinkProcessor's python tests, such as tests related to Parquet format, have been ignored by default as they require a Hadoop environment to function correctly. In order to run these tests, please install Hadoop on your local machine and set up environment variables as follows before executing the commands above.

export FEATHUB_TEST_HADOOP_CLASSPATH=`hadoop classpath`

You may refer to Flink's document for Hive connector for supported Hadoop & Hive versions.

Format Code Style

FeatHub uses the following tools to maintain code quality:

  • Black to format Python code
  • flake8 to check Python code style
  • mypy to check type annotation

Before uploading pull requests (PRs) for review, format codes, check code style, and check type annotations using the following commands:

# Format python code
$ python -m black ./python

# Check python code style
$ python -m flake8 --config=python/setup.cfg ./python

# Check python type annotation
$ python -m mypy --config-file python/setup.cfg ./python

Roadmap

Here is a list of key features that we plan to support:

  • Support all FeatureView transformations with FlinkProcessor
  • Support all FeatureView transformations with LocalProcessor
  • Support all FeatureView transformations with SparkProcessor
  • Support common online and offline feature storages (e.g. Kafka, Redis, Hive, MySQL)
  • Support persisting feature metadata in MySQL
  • Support exporting pre-defined and user-defined feature metrics to Prometheus
  • Support online transformation with feature service
  • Support feature metadata exploration (e.g. definition, lineage, metrics) with FeatHub UI

Contact Us

Chinese-speaking users are recommended to join the following DingTalk group for questions and discussion. You need to join the "Apache Flink China" DingTalk organization via this link first in order to join the following DingTalk Group.

English-speaking users can use this invitation link to join our Slack channel for questions and discussion.

We are actively looking for user feedback and contributors from the community. Please feel free to create pull requests and open Github issues for feedback and feature requests.

Come join us!

Additional Resources

  • Documentation: Our documentation provides guidance on compute engines, connectors, expression language, and more. Check it out if you need help getting started or want to learn more about FeatHub.
  • FeatHub Examples: This repository provides a wide variety of FeatHub demos that can be executed using Docker Compose. It's a great resource if you want to try out FeatHub and see what it can do.
  • Tech Talks and Articles

feathub's People

Contributors

alibaba-oss avatar jiangxin369 avatar lindong28 avatar sxnan avatar yunfengzhou-hub avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

feathub's Issues

Update nightly-build workflow so that it does not report error if nightly build already exists

Currently nightly build workflow can be triggered either manually or triggered by the daily schedule. If it has been triggered manually, the daily scheduled execution will fail due to "File already exists" error, which leads to unnecessary inconvenience for developers.

It might be better to use avoid this situation by using Test PyPI to perform and check the upload first. See https://pypi.org/help/#file-name-reuse for more detail.

Improve FlinkProcessor's logic to remove same window output

Currently, FeatHub's FlinkProcessor judges whether the current window aggregation output is the same as the previous one by checking the vacancy of the content to be retracted or added in each step size. This practice cannot work as expected in cases when late data or SlidingWindowTransform.limit is introduced. Thus this implementation should be fixed.

Feature should support translating string expression into JoinTransform

Users should be able to run the following code

f_price = Feature(
    name="price",
    dtype=types.Float32,
    transform="item_price_events.price",
    keys=["item_id"],
)

purchase_events_with_features = DerivedFeatureView(
    name="purchase_events_with_features",
    source=purchase_events_source,
    features=[
        f_price,
        f_total_payment_last_two_minutes,
    ],
    keep_source_fields=True,
)

Support creating FeathubClient from configuration file

In files like nyc_example.py, when creating a FeathubClient, apart from listing all the configuration parameters in the format of a dict variable, it should also be supported to read the configurations from a json or yaml file and create the FeathubClient from this. After that, nyc_example.py, nyc_example_flink_session.py, nyc_example_spark_client.py could be restructured into one python file and three configuration files with different processor configurations.

Add namespace to all registries

LocalRegistry uses namespace to avoid accidentally overriding registered feature descriptors. This should be extended into all registry, including mysql registry.

FeatHub unit tests should not leak file descriptor

Running pytest -W ignore::DeprecationWarning might fail with the "Too many open files" error.

It appears that the number of open files with "PIPE" type keeps increasing over time. This can be verified by running the following command repeatedly after starting pytest.

ps aux | grep pytest | grep local | awk '{print $2}' | xargs lsof -p | grep PIPE  | wc -l

Support sinking multiple feature tables into the same online store table

Feathub should allow materializating multiple TableDescriptors, which may have different schema and keys, into the same redis or memory online store. Unless the feature tables have entries with the same schema and keys, but different feature values, Feathub should not break or have correctness problems in the case described above.

Support grouping multiple materialize_features() into one job

We can add the following API to FeatHubClient:

def create_job_group(self) -> ProcessorJobGroup:
    ...

And here are the APIs of ProcessorJobGroup

class ProcessorJobGroup:
    """
    Provides APIs to get or wait for a job's execution result.
    """

    def __init__(self) -> None:
        pass

    def materialize_features(
        self,
        feature_descriptor: Union[str, TableDescriptor],
        sink: FeatureTable,
        ttl: Optional[timedelta] = None,
        start_datetime: Optional[datetime] = None,
        end_datetime: Optional[datetime] = None,
        allow_overwrite: bool = False,
    ):
        pass

    def start(self) -> ProcessorJob:
        pass

Improve error message when the timestamp field contains corrupted data

Currently, if the timestamp field contains corrupted data, the Flink job fails with the message, RowTime field should not be null, please convert it to a non-null long value, which is not informative. We want to improve the message so that it can show the corrupted data and field.

Support handling columns with none values in OnlineMemoryStore

In #102, FlinkProcessor#to_pandas is updated to handle column with none values. It is found that similar problem might also exist in OnlineMemoryStore#get, since there is code snippet as follows:

features = (
    pd.concat(rows, axis=1)
    .transpose()
    .reset_index(drop=True)
    .astype(
        {
            field_name: to_numpy_dtype(schema.get_field_type(field_name))
            for field_name in schema.field_names
        }
    )
)

Thus OnlineMemoryStore might also be updated to fix potential bugs.

Optimize SlidingWindow when window_size is infinite

When computing metrics using sliding window, it is a common scenario that a metric computes against all features the Feathub job ha processed so far, which means the window_size is infinite and no retraction needed. We should optimize SlidingWindowTransform in this use case and increase its performance.

Decouple connector plugin from framework code

Currently, the framework code of FeatHub is coupled together with concrete connector plugins. This code structure affects FeatHub extensibility and improves the workload when adding support for different connectors to FeatHub.

Apply Flink native configuration before creating tableEnv instance

In an attempt to create FeatHubClient with the following code:

client = FeathubClient(
    props={
        "processor": {
            "type": "flink",
            "flink": {
                "deployment_mode": "cli",
                "native.execution.runtime-mode": "BATCH"
            },
        },
        "registry": {
            "type": "local",
            "local": {
                "namespace": "default",
            },
        },
        "feature_service": {
            "type": "local",
            "local": {},
        },
    }
)

The following error is thrown when the program arrives at Table#to_pandas:

java.lang.IllegalArgumentException: Mismatch between configured runtime mode and actual runtime mode. Currently, the 'execution.runtime-mode' can only be set when instantiating the table environment. Subsequent changes are not supported. Please instantiate a new TableEnvironment if necessary.

Thus the configuration initialization process should be improved to avoid exceptions like this.

Remove join operation between window transforms with different limit parameter

Currently in FlinkProcessor, SlidingWindowTransforms and OverWindowTransforms whose limit parameters are different would be evaluated separately and joined together afterward, which has added to the performance overhead in some feature-generation cases. It should be optimized to evaluate transformations matching this character with the same process function, and remove the unnecessary joins.

Support reordering of features in FeatureView

Feathub has the following code in feature_view.py

        reordered_output_fields = []
        for field in source_fields:
            if field in output_fields:
                reordered_output_fields.append(field)
        reordered_output_fields.extend(output_fields)

Codes like this should be removed to allow users to reorder the columns in the source table.

Support timestamp comparison when writing data to Redis

FlinkProcessor's RedisSink should support writing features with timestamp to Redis, where only features with larger keys preserve in Redis.

This function has not been supported so far because in a Redis cluster, the timestamp of a feature and the feature itself might be distributed onto different Redis nodes, and querying across nodes might bring a performance overhead. It is recently discovered that Redis cluster supports grouping keys with hashtags, and key-value pairs with the same hashtag would be held by the same cluster node. Thus this performance overhead can be walked around now.

Distinguish empty collection and none in Redis connector

When a Feathub job is writing a map-typed or list-typed feature into Redis with FlinkProcessor, if the feature is an empty map or empty list, there will not be a corresponding entry persisted in Redis, which is equivalent to the case when the feature is none. This causes the problem that Feathub jobs cannot distinguish between none values and empty collections when reading from RedisSources, potentially causing correctness problem.

To resolve this, we need to first check whether Redis does display the same behavior between when a key does not exist and when the key exists but corresponds to none or empty collections. If this problem cannot be resolved due to the limitations of Redis API,
Feathub might need to implement special values to Redis to distinguish between these cases.

Provide utility function to verify DataFrames

Currently, some tests in feathub verify a pandas DataFrame as follows.

self.assertTrue(expected_df.equals(result_df)

Such implementation cannot work in certain cases. For example, if an element in the two DataFrames is a dict whose order of entries is different, they are supposed to pass the test, while the current implementation fails in this case.

Thus we should use a better way to verify DataFrames.

Check if the schema contains timestamp field if timestamp_field is not None

Currently, we can construct a Source with timestamp_field set but not contained in the schema like the following test. We should raise an exception in Feathub instead of failing until submitting to a processor.

    def test_get_output_fields(self):
        field_names = ["id", "val1", "val2", "val3"]
        source = DataGenSource(
            name="source_1",
            schema=Schema(field_names, [Int64, Int64, Int64, Int64]),
            timestamp_field="lpep_dropoff_datetime",
            timestamp_format="%Y-%m-%d %H:%M:%S",
            keys=["id"],
        )

        feature_view_2 = DerivedFeatureView(
            name="feature_view_2",
            source=source,
            features=["val2", "val3"],
            keep_source_fields=False,
        )

        built_feature_view_2 = feature_view_2.build(self.registry)

        if isinstance(built_feature_view_2, FeatureView):
            self.assertEqual(
                ["lpep_dropoff_datetime", "id", "val2", "val3"],
                built_feature_view_2.get_output_fields(field_names),
            )
        else:
            raise Exception("FeatureView is expected.")

Support more generic filtering functionality

FeatHub has a filter_expr attribute on OverWindowTransform and SlidingWindowTransform. These APIs cannot meet the needs when users want to filter data without using window transformations. A more generic filtering functionality should be added to FeatHub to resolve this problem.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.