alibaba / feathub Goto Github PK
View Code? Open in Web Editor NEWFeatHub - A stream-batch unified feature store for real-time machine learning
License: Apache License 2.0
FeatHub - A stream-batch unified feature store for real-time machine learning
License: Apache License 2.0
Currently, if the timestamp field contains corrupted data, the Flink job fails with the message, RowTime field should not be null, please convert it to a non-null long value
, which is not informative. We want to improve the message so that it can show the corrupted data and field.
In files like nyc_example.py
, when creating a FeathubClient, apart from listing all the configuration parameters in the format of a dict variable, it should also be supported to read the configurations from a json or yaml file and create the FeathubClient from this. After that, nyc_example.py
, nyc_example_flink_session.py
, nyc_example_spark_client.py
could be restructured into one python file and three configuration files with different processor configurations.
In #102, FlinkProcessor#to_pandas
is updated to handle column with none values. It is found that similar problem might also exist in OnlineMemoryStore#get
, since there is code snippet as follows:
features = (
pd.concat(rows, axis=1)
.transpose()
.reset_index(drop=True)
.astype(
{
field_name: to_numpy_dtype(schema.get_field_type(field_name))
for field_name in schema.field_names
}
)
)
Thus OnlineMemoryStore might also be updated to fix potential bugs.
FeatHub should be able to derive the feature's dtype automatically in common cases.
Currently, some tests in feathub verify a pandas DataFrame as follows.
self.assertTrue(expected_df.equals(result_df)
Such implementation cannot work in certain cases. For example, if an element in the two DataFrames is a dict whose order of entries is different, they are supposed to pass the test, while the current implementation fails in this case.
Thus we should use a better way to verify DataFrames.
When a Feathub job is writing a map-typed or list-typed feature into Redis with FlinkProcessor, if the feature is an empty map or empty list, there will not be a corresponding entry persisted in Redis, which is equivalent to the case when the feature is none. This causes the problem that Feathub jobs cannot distinguish between none values and empty collections when reading from RedisSources, potentially causing correctness problem.
To resolve this, we need to first check whether Redis does display the same behavior between when a key does not exist and when the key exists but corresponds to none or empty collections. If this problem cannot be resolved due to the limitations of Redis API,
Feathub might need to implement special values to Redis to distinguish between these cases.
We should also replace timestamp format values such as epoch
and epoch_millis
with enums.
Feathub has the following code in feature_view
.py
reordered_output_fields = []
for field in source_fields:
if field in output_fields:
reordered_output_fields.append(field)
reordered_output_fields.extend(output_fields)
Codes like this should be removed to allow users to reorder the columns in the source table.
We can add the following API to FeatHubClient
:
def create_job_group(self) -> ProcessorJobGroup:
...
And here are the APIs of ProcessorJobGroup
class ProcessorJobGroup:
"""
Provides APIs to get or wait for a job's execution result.
"""
def __init__(self) -> None:
pass
def materialize_features(
self,
feature_descriptor: Union[str, TableDescriptor],
sink: FeatureTable,
ttl: Optional[timedelta] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
allow_overwrite: bool = False,
):
pass
def start(self) -> ProcessorJob:
pass
Feathub should allow materializating multiple TableDescriptor
s, which may have different schema and keys, into the same redis or memory online store. Unless the feature tables have entries with the same schema and keys, but different feature values, Feathub should not break or have correctness problems in the case described above.
FeatHub has a filter_expr
attribute on OverWindowTransform
and SlidingWindowTransform
. These APIs cannot meet the needs when users want to filter data without using window transformations. A more generic filtering functionality should be added to FeatHub to resolve this problem.
Users should be able to run the following code
f_price = Feature(
name="price",
dtype=types.Float32,
transform="item_price_events.price",
keys=["item_id"],
)
purchase_events_with_features = DerivedFeatureView(
name="purchase_events_with_features",
source=purchase_events_source,
features=[
f_price,
f_total_payment_last_two_minutes,
],
keep_source_fields=True,
)
Maybe use Binder similar to https://github.com/fangohr/jupyter-demo.
In an attempt to create FeatHubClient with the following code:
client = FeathubClient(
props={
"processor": {
"type": "flink",
"flink": {
"deployment_mode": "cli",
"native.execution.runtime-mode": "BATCH"
},
},
"registry": {
"type": "local",
"local": {
"namespace": "default",
},
},
"feature_service": {
"type": "local",
"local": {},
},
}
)
The following error is thrown when the program arrives at Table#to_pandas
:
java.lang.IllegalArgumentException: Mismatch between configured runtime mode and actual runtime mode. Currently, the 'execution.runtime-mode' can only be set when instantiating the table environment. Subsequent changes are not supported. Please instantiate a new TableEnvironment if necessary.
Thus the configuration initialization process should be improved to avoid exceptions like this.
It would be useful to support the following expression
DerivedFeatureView(
name="feature_view_1",
source=source,
features=[
"remote_table.feature_name + 10",
],
)
The QR code has expired,I want to join you and learn together,update the QR please,thanks!
See https://stackoverflow.com/questions/75074182/feature-and-featureview-versioning
This can make it easy for data scientists to do experiments and compare which version of feature can produce better results.
Currently, FeatHub's FlinkProcessor judges whether the current window aggregation output is the same as the previous one by checking the vacancy of the content to be retracted or added in each step size. This practice cannot work as expected in cases when late data or SlidingWindowTransform.limit is introduced. Thus this implementation should be fixed.
FlinkProcessor's RedisSink should support writing features with timestamp to Redis, where only features with larger keys preserve in Redis.
This function has not been supported so far because in a Redis cluster, the timestamp of a feature and the feature itself might be distributed onto different Redis nodes, and querying across nodes might bring a performance overhead. It is recently discovered that Redis cluster supports grouping keys with hashtags, and key-value pairs with the same hashtag would be held by the same cluster node. Thus this performance overhead can be walked around now.
This can be achieved by re-using the aggregation function added in FeatHub.
Feathub unit test occasionally fails with the following exception:
https://github.com/lindong28/feathub/actions/runs/3307702206/jobs/5459525283
Once the config is set to true, FeatHub can add an extra FeatureView after every user-specified FeatureView. Th extra FeatureView can print the values from the input FeatureView and output these values without any change.
Thanks for your wonderful work, I want to use feathub in my project and want to know how to use Reusing the Same Data History for Multiple Sliding Window Features in my feature view config.
Currently, the framework code of FeatHub is coupled together with concrete connector plugins. This code structure affects FeatHub extensibility and improves the workload when adding support for different connectors to FeatHub.
Currently, we can construct a Source with timestamp_field set but not contained in the schema like the following test. We should raise an exception in Feathub instead of failing until submitting to a processor.
def test_get_output_fields(self):
field_names = ["id", "val1", "val2", "val3"]
source = DataGenSource(
name="source_1",
schema=Schema(field_names, [Int64, Int64, Int64, Int64]),
timestamp_field="lpep_dropoff_datetime",
timestamp_format="%Y-%m-%d %H:%M:%S",
keys=["id"],
)
feature_view_2 = DerivedFeatureView(
name="feature_view_2",
source=source,
features=["val2", "val3"],
keep_source_fields=False,
)
built_feature_view_2 = feature_view_2.build(self.registry)
if isinstance(built_feature_view_2, FeatureView):
self.assertEqual(
["lpep_dropoff_datetime", "id", "val2", "val3"],
built_feature_view_2.get_output_fields(field_names),
)
else:
raise Exception("FeatureView is expected.")
When computing metrics using sliding window, it is a common scenario that a metric computes against all features the Feathub job ha processed so far, which means the window_size is infinite and no retraction needed. We should optimize SlidingWindowTransform in this use case and increase its performance.
When running nyc_taxi.py
, a .parser.out
file would be generated in the same directory. Feathub should provide configuration options to avoid generating such log files in non-debug situations.
We can add a parameter named HandleInvalid whose value can be either "skip" or "error".
LocalRegistry uses namespace to avoid accidentally overriding registered feature descriptors. This should be extended into all registry, including mysql registry.
Currently nightly build workflow can be triggered either manually or triggered by the daily schedule. If it has been triggered manually, the daily scheduled execution will fail due to "File already exists" error, which leads to unnecessary inconvenience for developers.
It might be better to use avoid this situation by using Test PyPI to perform and check the upload first. See https://pypi.org/help/#file-name-reuse for more detail.
Feathub does not check the existence of license contents in its files. If we forget to add a license to a newly added file, the maven/python compilation process can still pass. This should be fixed.
Running pytest -W ignore::DeprecationWarning
might fail with the "Too many open files" error.
It appears that the number of open files with "PIPE" type keeps increasing over time. This can be verified by running the following command repeatedly after starting pytest.
ps aux | grep pytest | grep local | awk '{print $2}' | xargs lsof -p | grep PIPE | wc -l
Currently in FlinkProcessor, SlidingWindowTransforms and OverWindowTransforms whose limit
parameters are different would be evaluated separately and joined together afterward, which has added to the performance overhead in some feature-generation cases. It should be optimized to evaluate transformations matching this character with the same process function, and remove the unnecessary joins.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.