Coder Social home page Coder Social logo

laysakura / beam Goto Github PK

View Code? Open in Web Editor NEW

This project forked from nivaldoh/beam

5.0 5.0 6.0 304.66 MB

Apache Beam is a unified programming model for Batch and Streaming data processing.

Home Page: https://beam.apache.org/

License: Apache License 2.0

Shell 0.61% JavaScript 0.18% Python 17.10% C 0.01% Java 64.81% Lua 0.01% Scala 0.01% Groovy 1.46% Go 9.50% Kotlin 0.37% Dart 2.13% TypeScript 2.98% CSS 0.01% ANTLR 0.01% Thrift 0.01% HTML 0.29% HCL 0.33% FreeMarker 0.01% Jupyter Notebook 0.09% Cython 0.11%

beam's Introduction

Hi πŸ‘‹, I'm @laysakura

A low-level system developer / backend engineer in Tokyo.

laysakura

πŸ”­ I’m currently working on ...

  • R&D on softwares related to connected vehicles at Toyota.
    • AI security
    • Privacy tech
    • SpringQL, a stream processor for IoT devices and in-vehicle computers.
    • Secure distributed stream processor using TEE
    • DataLake architecture with streaming data processing

πŸ“ I regularly write articles on ...

πŸ’¬ Ask me about ...

  • Cyber security (especially AI security)
  • Rust
  • RDBMS internal
  • Stream processing
  • Data analytics platform architectures (DWH, DataLake, ...)
  • AI accelerator

πŸ“„ Know about my experiences:

πŸ“« How to reach me:

πŸ“ˆ Stats:

@laysakura's GitHub Stats card

laysakura

laysakura

beam's People

Contributors

aaltay avatar angoenka avatar apilloud avatar aromanenko-dev avatar chamikaramj avatar charlesccychen avatar damccorm avatar davorbonaci avatar dependabot[bot] avatar dhalperi avatar echauchot avatar herohde avatar ibzib avatar iemejia avatar ihji avatar jbonofre avatar jkff avatar kennknowles avatar lostluck avatar lukecwik avatar mxm avatar pabloem avatar reuvenlax avatar robertwb avatar swegner avatar tgroh avatar theneuralbit avatar tvalentyn avatar tweise avatar udim avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

beam's Issues

Wholistic examples

Problem

Currently, we only have limited number of test cases.

To be

  • We would better to have examples/ directly and show how to use the Beam Rust SDK in codes
  • Or doc-tests in lib.rs

[Bug]: Removing `unsafe {}` makes `cargo test` unstoppable

What happened?

How to reproduce

cd sdks/rust
cargo test -- --skip target/debug

After you apply the diff in #16 , then the test execution pauses during Running tests/worker_test.rs.

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Task]: Benchmark DashMap against Moka

What needs to happen?

See the description in #43

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Feature Request]: Stop using `Any` (statically-typed Pipeline)

Currently, Any are used everywhere:

e.g.

pub fn to_generic_dofn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(

IMO, one of the important feature of Beam Rust SDK should be statically-typed Pipeline (with generics support).

I already did some work for that in 87b9939 targeting at nivaldoh's HEAD (almost same diffs with nivaldoh#26).

But we should proceed the work to robertwb's work: nivaldoh#22 (I already hand-merged it in 9564b4e )

[Bug]: test failures

How to reproduce

RUST_BACKTRACE=1 cargo test -- --skip target/debug
running 10 tests
test tests::primitives_test::tests::ensure_assert_fails_on_empty ... ignored
test tests::coders_test::tests::test_general_object_coder ... ok
test tests::primitives_test::tests::run_impulse_expansion ... ok
test tests::worker_test::tests::test_operator_construction ... ok
test tests::primitives_test::tests::run_direct_runner ... ok
test tests::coders_test::tests::test_standard_coders ... ok
test tests::primitives_test::tests::run_gbk ... FAILED
test tests::primitives_test::tests::ensure_assert_fails - should panic ... ok
test tests::primitives_test::tests::run_map ... FAILED
test tests::primitives_test::tests::run_flatten ... FAILED


---- tests::primitives_test::tests::run_gbk stdout ----
thread 'tests::primitives_test::tests::run_gbk' panicked at 'called `Option::unwrap()` on a `None` value', src/internals/serialize.rs:108:57
stack backtrace:
   0: rust_begin_unwind
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:64:14
   2: core::panicking::panic
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:114:5
   3: core::option::Option<T>::unwrap
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/option.rs:823:21
   4: <apache_beam::internals::serialize::TypedKeyExtractor<V> as apache_beam::internals::serialize::KeyExtractor>::extract
             at ./src/internals/serialize.rs:108:24
   5: <apache_beam::worker::operators::GroupByKeyWithinBundleOperator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:503:28
   6: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:134:17
   7: apache_beam::worker::operators::Receiver::receive
             at ./src/worker/operators.rs:237:13
   8: <apache_beam::worker::operators::ParDoOperator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:581:17
   9: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:137:17
  10: apache_beam::worker::operators::Receiver::receive
             at ./src/worker/operators.rs:237:13
  11: <apache_beam::worker::operators::ParDoOperator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:581:17
  12: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
             at ./src/worker/operators.rs:137:17
  13: apache_beam::worker::operators::Receiver::receive
             at ./src/worker/operators.rs:237:13
  14: <apache_beam::worker::operators::ImpulsePerBundleOperator as apache_beam::worker::operators::OperatorI>::start_bundle
             at ./src/worker/operators.rs:450:13
  15: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::start_bundle
             at ./src/worker/operators.rs:114:46
  16: apache_beam::worker::sdk_worker::BundleProcessor::process::{{closure}}
             at ./src/worker/sdk_worker.rs:378:13
  17: <apache_beam::runners::direct_runner::DirectRunner as apache_beam::runners::runner::RunnerI>::run_pipeline::{{closure}}
             at ./src/runners/direct_runner.rs:84:51
  18: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
  19: apache_beam::runners::runner::RunnerI::run_async::{{closure}}
             at ./src/runners/runner.rs:64:41
  20: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
  21: apache_beam::runners::runner::RunnerI::run::{{closure}}
             at ./src/runners/runner.rs:48:33
  22: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
  23: apache_beam::tests::primitives_test::tests::run_gbk::{{closure}}
             at ./src/tests/primitives_test.rs:94:13
  24: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
  25: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
  26: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/sho.nakatani/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/runtime/scheduler/current_thread.rs:525:48
  27: tokio::coop::with_budget::{{closure}}
             at /Users/sho.nakatani/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/coop.rs:102:9
  28: std::thread::local::LocalKey<T>::try_with
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/thread/local.rs:446:16
  29: std::thread::local::LocalKey<T>::with
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/thread/local.rs:422:9

...

[Task]: refactor: separate worker and pipeline construction module

What needs to happen?

IMO, mod internals is a bad one.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Task]: Pipeline construction helper macro

What needs to happen?

We are writing a pipeline like:

in main

#[tokio::main]
fn main() {
        DirectRunner::new()
            .run(|root| {
                root.apply(...)
            .await;
}

in test

#[tokio::test]
async fn run_flatten() {
DirectRunner::new()
.run(|root| {
let first = root.apply(Create::new(vec![1, 2, 3]));
let second = root.apply(Create::new(vec![100, 200]));
PValue::new_array(&[first, second])
.apply(Flatten::new())
.apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))
})
.await;
}
}


As you may see, we have a bit of boilerplate codes.

We may want to write like:

#[apache_beam::main]
fn main(root: PValue) {
  root.apply(...);
}

The task should be like:

  1. First, conduct survey on other Beam SDKs and popular crates with such macro, and then write a design document (in google docs and add to the wiki page)
  2. Request a review from other contributors ( @laysakura and/or @sjvanrossum would be happy to review it).
  3. Develop it.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Task]: Pipeline testing framework

What needs to happen?

We already hold some transforms to make assertion like:

.apply(AssertEqualUnordered::new(&[1]))

But we might lack some test transforms compared to other SDKs, or rather we might be able to develop more sophisticated test framework to test pipelines.

The task should be like:

  1. First, conduct survey on other SDKs and write a design document (in google docs and add to the wiki page) to make aligned image of the next test framework of Rust SDK.
  2. Request a review from other contributors ( @laysakura and/or @sjvanrossum would be happy to review it).
  3. Develop it.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Feature Request]: Design adequate yet flexible ways to serialize/deserialize DoFn payloads

What would you like to happen?

During pipeline execution, the bundle processor retrieves a process bundle descriptor, which contains descriptors for transforms, collections, windowing strategies, coders and other relevant descriptors to execute a pipeline. Reconstructing the pipeline graph and runtime types on a worker might have unintended side effects due to variations in the execution environment (e.g. hostname, RNGs, user names, etc.). The graph and all its types as it existed at pipeline construction time must be exactly replicable at pipeline execution time. The FnAPI BeamFnControl service enables FnAPI SDKs to retrieve serialized representations of all relevant objects to make this work.

Rust does not support handy features such as wildcard generics in Java to treat all generic trait implementations as equal types. Rust also does not ship with a usable RTTI system in stable Rust, which could then be used to facilitate (de)serialization more easily. We thus require ways to specify typed and untyped views of all objects.
For example, take this oversimplified DoFn example:

trait TypedDoFn {
  type I: Any;
  type O: Any;

  fn process(&mut self, element: &I) -> O;
}

trait UntypedDoFn {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any>;
}

struct MyDoFn;

impl DoFn for MyDoFn {
  type I = String;
  type O = String;

  fn process(&mut self, element: &Self::I) -> Self::O {
    element.to_uppercase()
  }
}

We can either provide a blanket UntypedDoFn implementation for all TypedDoFns as such:

impl<I, O, T: DoFn<Input = I, Output = O>> UntypedDoFn for T {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
    Box::new(self.process_element(element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
  }
}

No issues so far until we need to serialize the DoFn and deserialize it on another machine.
Besides the point that the serde crate is the most likely pick for serialization/deserialization and that an efficient binary format like bincode would be preferred, how do we know which struct type the serialized data represents and what UntypedDoFn vtable is required to operate on it? Type registries with registered deserializers do not ship as part of the runtime as it does in Java for example. There are a fair number of crates which solve this, such as bevy_reflect, typetag and serde_traitobject. Both bevy_reflect and typetag (stable features) solve this using type registries, with Bevy requiring users to create and register types manually, whereas typetag relies on the inventory and ctor crates to automatically reconstruct this using life before main. Lastly, serde_traitobject (nightly features) solves this by encoding the struct and trait type id into the serialized data, which offers the most flexibility since explicit type registration (which has its own downsides) is not necessary.

It's currently impossible (for good reason) to have typetag register generic types and traits (associated as well), but a workaround can be achieved by setting up a custom macro with which we would annotate a DoFn or Coder struct to expand the UntypedDoFn implementation as entirely non-generic, like:

impl UntypedDoFn for MyDoFn {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
    Box::new(<Self as DoFn>::process_element(self, element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
  }
}

One topic which requires some care and attention is utility DoFns and the desire to use modern patterns for simple Filter/FlatMap/Map transforms. Something like collection.map(|element: &String| -> String { element.to_uppercase() }) becomes impossible to write, because the closure type can't be represented without using generics. A clever way to hide this could be to write proc macros which unpack a closure expression's body and rewrite it as a struct with the necessary trait implementations, such that this can be expressed as collection.apply(Map!(|element: &String| -> String { string.to_uppercase() })). Utility functions like fn<I, O, F: Fn(&I) -> O> map(f: F) could potentially be shipped as part of a trait in a separate crate which adds those utility functions to PCollection with serialization handled by e.g. serde_traitobject. This would enable us to ship Beam's core as a crate which only uses stable features, but allows users to explicitly opt into developer ergonomics with unstable Rust features.

To complete this issue we simply need to agree if we'd like to build a stable only SDK with nightly features in separate crates or if we embrace nightly features as part of the core SDK to provide more ergonomic ways of expressing ourselves.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Task]: Use turmoil to simulate client and server environments in tests

What needs to happen?

See the description in #43

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

[Task]: feat: support Coder with components (internal type parameters)

What needs to happen?

see: https://github.com/laysakura/beam/pull/36/files#diff-4b1a1a51359ed219262fed36be0a8aaef47e373b33538073affcec8daf32d2eaR22

Maybe construct concrete Coder from CoderUrnTree will solve it.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

How to contribute

Coming from apache#21089 (comment), I noticed that you (@laysakura) welcome contributions! I'm interested in contributing, but wondering how you would prefer to handle contributions.

Do you already have plans regarding the direction/priorities, or should I just go hunting for todos in the code that seem like something I could work on? Would you like an issue created first, or rather not be bothered until (I think) I have something ready for review?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.