Coder Social home page Coder Social logo

alibaba / feathub Goto Github PK

View Code? Open in Web Editor NEW
307.0 11.0 49.0 4.67 MB

FeatHub - A stream-batch unified feature store for real-time machine learning

License: Apache License 2.0

Dockerfile 0.07% Python 75.57% Shell 0.87% Java 23.49%
apache-flink data-engineering data-science feature-engineering feature-store machine-learning streaming data data-quality mlops

feathub's Issues

Improve error message when the timestamp field contains corrupted data

Currently, if the timestamp field contains corrupted data, the Flink job fails with the message, RowTime field should not be null, please convert it to a non-null long value, which is not informative. We want to improve the message so that it can show the corrupted data and field.

Support creating FeathubClient from configuration file

In files like nyc_example.py, when creating a FeathubClient, apart from listing all the configuration parameters in the format of a dict variable, it should also be supported to read the configurations from a json or yaml file and create the FeathubClient from this. After that, nyc_example.py, nyc_example_flink_session.py, nyc_example_spark_client.py could be restructured into one python file and three configuration files with different processor configurations.

Support handling columns with none values in OnlineMemoryStore

In #102, FlinkProcessor#to_pandas is updated to handle column with none values. It is found that similar problem might also exist in OnlineMemoryStore#get, since there is code snippet as follows:

features = (
    pd.concat(rows, axis=1)
    .transpose()
    .reset_index(drop=True)
    .astype(
        {
            field_name: to_numpy_dtype(schema.get_field_type(field_name))
            for field_name in schema.field_names
        }
    )
)

Thus OnlineMemoryStore might also be updated to fix potential bugs.

Provide utility function to verify DataFrames

Currently, some tests in feathub verify a pandas DataFrame as follows.

self.assertTrue(expected_df.equals(result_df)

Such implementation cannot work in certain cases. For example, if an element in the two DataFrames is a dict whose order of entries is different, they are supposed to pass the test, while the current implementation fails in this case.

Thus we should use a better way to verify DataFrames.

Distinguish empty collection and none in Redis connector

When a Feathub job is writing a map-typed or list-typed feature into Redis with FlinkProcessor, if the feature is an empty map or empty list, there will not be a corresponding entry persisted in Redis, which is equivalent to the case when the feature is none. This causes the problem that Feathub jobs cannot distinguish between none values and empty collections when reading from RedisSources, potentially causing correctness problem.

To resolve this, we need to first check whether Redis does display the same behavior between when a key does not exist and when the key exists but corresponds to none or empty collections. If this problem cannot be resolved due to the limitations of Redis API,
Feathub might need to implement special values to Redis to distinguish between these cases.

Support reordering of features in FeatureView

Feathub has the following code in feature_view.py

        reordered_output_fields = []
        for field in source_fields:
            if field in output_fields:
                reordered_output_fields.append(field)
        reordered_output_fields.extend(output_fields)

Codes like this should be removed to allow users to reorder the columns in the source table.

Support grouping multiple materialize_features() into one job

We can add the following API to FeatHubClient:

def create_job_group(self) -> ProcessorJobGroup:
    ...

And here are the APIs of ProcessorJobGroup

class ProcessorJobGroup:
    """
    Provides APIs to get or wait for a job's execution result.
    """

    def __init__(self) -> None:
        pass

    def materialize_features(
        self,
        feature_descriptor: Union[str, TableDescriptor],
        sink: FeatureTable,
        ttl: Optional[timedelta] = None,
        start_datetime: Optional[datetime] = None,
        end_datetime: Optional[datetime] = None,
        allow_overwrite: bool = False,
    ):
        pass

    def start(self) -> ProcessorJob:
        pass

Support sinking multiple feature tables into the same online store table

Feathub should allow materializating multiple TableDescriptors, which may have different schema and keys, into the same redis or memory online store. Unless the feature tables have entries with the same schema and keys, but different feature values, Feathub should not break or have correctness problems in the case described above.

Support more generic filtering functionality

FeatHub has a filter_expr attribute on OverWindowTransform and SlidingWindowTransform. These APIs cannot meet the needs when users want to filter data without using window transformations. A more generic filtering functionality should be added to FeatHub to resolve this problem.

Feature should support translating string expression into JoinTransform

Users should be able to run the following code

f_price = Feature(
    name="price",
    dtype=types.Float32,
    transform="item_price_events.price",
    keys=["item_id"],
)

purchase_events_with_features = DerivedFeatureView(
    name="purchase_events_with_features",
    source=purchase_events_source,
    features=[
        f_price,
        f_total_payment_last_two_minutes,
    ],
    keep_source_fields=True,
)

Apply Flink native configuration before creating tableEnv instance

In an attempt to create FeatHubClient with the following code:

client = FeathubClient(
    props={
        "processor": {
            "type": "flink",
            "flink": {
                "deployment_mode": "cli",
                "native.execution.runtime-mode": "BATCH"
            },
        },
        "registry": {
            "type": "local",
            "local": {
                "namespace": "default",
            },
        },
        "feature_service": {
            "type": "local",
            "local": {},
        },
    }
)

The following error is thrown when the program arrives at Table#to_pandas:

java.lang.IllegalArgumentException: Mismatch between configured runtime mode and actual runtime mode. Currently, the 'execution.runtime-mode' can only be set when instantiating the table environment. Subsequent changes are not supported. Please instantiate a new TableEnvironment if necessary.

Thus the configuration initialization process should be improved to avoid exceptions like this.

Improve FlinkProcessor's logic to remove same window output

Currently, FeatHub's FlinkProcessor judges whether the current window aggregation output is the same as the previous one by checking the vacancy of the content to be retracted or added in each step size. This practice cannot work as expected in cases when late data or SlidingWindowTransform.limit is introduced. Thus this implementation should be fixed.

Support timestamp comparison when writing data to Redis

FlinkProcessor's RedisSink should support writing features with timestamp to Redis, where only features with larger keys preserve in Redis.

This function has not been supported so far because in a Redis cluster, the timestamp of a feature and the feature itself might be distributed onto different Redis nodes, and querying across nodes might bring a performance overhead. It is recently discovered that Redis cluster supports grouping keys with hashtags, and key-value pairs with the same hashtag would be held by the same cluster node. Thus this performance overhead can be walked around now.

Decouple connector plugin from framework code

Currently, the framework code of FeatHub is coupled together with concrete connector plugins. This code structure affects FeatHub extensibility and improves the workload when adding support for different connectors to FeatHub.

Check if the schema contains timestamp field if timestamp_field is not None

Currently, we can construct a Source with timestamp_field set but not contained in the schema like the following test. We should raise an exception in Feathub instead of failing until submitting to a processor.

    def test_get_output_fields(self):
        field_names = ["id", "val1", "val2", "val3"]
        source = DataGenSource(
            name="source_1",
            schema=Schema(field_names, [Int64, Int64, Int64, Int64]),
            timestamp_field="lpep_dropoff_datetime",
            timestamp_format="%Y-%m-%d %H:%M:%S",
            keys=["id"],
        )

        feature_view_2 = DerivedFeatureView(
            name="feature_view_2",
            source=source,
            features=["val2", "val3"],
            keep_source_fields=False,
        )

        built_feature_view_2 = feature_view_2.build(self.registry)

        if isinstance(built_feature_view_2, FeatureView):
            self.assertEqual(
                ["lpep_dropoff_datetime", "id", "val2", "val3"],
                built_feature_view_2.get_output_fields(field_names),
            )
        else:
            raise Exception("FeatureView is expected.")

Optimize SlidingWindow when window_size is infinite

When computing metrics using sliding window, it is a common scenario that a metric computes against all features the Feathub job ha processed so far, which means the window_size is infinite and no retraction needed. We should optimize SlidingWindowTransform in this use case and increase its performance.

Add namespace to all registries

LocalRegistry uses namespace to avoid accidentally overriding registered feature descriptors. This should be extended into all registry, including mysql registry.

Update nightly-build workflow so that it does not report error if nightly build already exists

Currently nightly build workflow can be triggered either manually or triggered by the daily schedule. If it has been triggered manually, the daily scheduled execution will fail due to "File already exists" error, which leads to unnecessary inconvenience for developers.

It might be better to use avoid this situation by using Test PyPI to perform and check the upload first. See https://pypi.org/help/#file-name-reuse for more detail.

FeatHub unit tests should not leak file descriptor

Running pytest -W ignore::DeprecationWarning might fail with the "Too many open files" error.

It appears that the number of open files with "PIPE" type keeps increasing over time. This can be verified by running the following command repeatedly after starting pytest.

ps aux | grep pytest | grep local | awk '{print $2}' | xargs lsof -p | grep PIPE  | wc -l

Remove join operation between window transforms with different limit parameter

Currently in FlinkProcessor, SlidingWindowTransforms and OverWindowTransforms whose limit parameters are different would be evaluated separately and joined together afterward, which has added to the performance overhead in some feature-generation cases. It should be optimized to evaluate transformations matching this character with the same process function, and remove the unnecessary joins.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.