Coder Social home page Coder Social logo

comnik / declarative-dataflow Goto Github PK

View Code? Open in Web Editor NEW
305.0 24.0 28.0 1.08 MB

A reactive query engine built on differential dataflow.

License: MIT License

Rust 100.00%
datalog differential-dataflows relational reactive-query-engine graphql

declarative-dataflow's Introduction

Declarative Dataflow

A reactive query engine built on differential dataflow.

crates

Documentation

Features

Interactive use: Declarative accepts queries expressed in a Datalog-inspired binding language and turns them into differential dataflows dynamically and at runtime. This works equally well as a library embedded into a specific application or as a standalone service (e.g. via the included WebSocket server).

Reactive relational queries: Declarative provides a relational query language, with full support for traditional binary joins, worst-case optimal n-way joins, antijoins, various aggregates, predicates, unions, and projections. Queries are made up of rules, which can depend on each other (or recursively on themselves). Query results are updated incrementally as inputs change.

[WIP] Reactive GraphQL queries: Declarative also comes with built-in support for GraphQL-like queries, for a more document-oriented usage model.

Pluggable sinks and sources: Declarative can be extended to read data from and write results back to external systems, such as Kafka or Datomic, as well as static sources such as csv files.

Pluggable frontends: Languages such as Datalog and SQL can be easily implemented on top of Declarative. Well, maybe not easily, but easier than without. A Datalog frontend is provided in Clojure(Script).

Thanks to Differential Dataflow, all these capabilities are provided within the dataflow model and can thus be scaled out to multiple independent workers. Declarative is less efficient and much more opinionated than hand-written Differential Dataflow. In particular, it enforces a fully-normalized, RDF-like data model heavily inspired by systems like Datomic or LogicBlox. Other than that, Declarative is just Differential Dataflow under the hood and can happily co-exist and interact with static, handwritten dataflows.

Included in this repository is the library itself, a server, and a cli.

Build

The library is built using cargo. A sufficiently up-to-date Rust toolchain is enforced via the rust-toolchain file included in this repository.

Declarative Dataflow makes use of the log crate. Logging at a specific level can be enabled by setting the RUST_LOG environment variable to RUST_LOG=declarative_dataflow=<level>.

Documentation

Crate documentation available on crates.io.

Important architectural decisions are documented in the docs/adr/ sub-directory.

Documentation for this package can be built via cargo doc --no-deps and viewed in a browser via cargo doc --no-deps --open. Please refer to declarative_dataflow::plan::Plan for documentation on the available operators. The tests/ directory contains usage examples.

Clients

Declarative Dataflow servers accept a lower-level relational query plan representation, that is meant to be somewhat frontend agnosti. However query plans are rather cumbersome to write manually and do not map to any interesting, higher-level semantics. Currently we therefore provide a Datalog front end written in Clojure.

Please refer to the documentation for an overview of the supported query plans.

Further Reading / Watching

A post on the high-level motivation for this project.

[video] Reactive Datalog For Datomic, recorded at Clojure/conj 2018.

[video] 3DF: Reactive Datalog For Datomic, recorded at :clojureD 2019.

The Clockworks blog has a number of posts on Declarative.

declarative-dataflow's People

Contributors

comnik avatar frankmcsherry avatar li1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

declarative-dataflow's Issues

User transactable attributes may halt progress if sources dictate times

Just ran into this.
0561422 introduced advance_domain_to_source() which enables sources to dictate times.
Internally we use the domain probe to gather frontier information about all attributes in one domain.
This frontier information in turn is then used to advance() all kinds of things.
We probe source created as well as user created attributes.
Source attribute sessions are auto_flush as soon as the source downgrades its capability.
User created attribute sessions are not.
This leads to the problem that the frontier is stuck on the transaction of the user.
advance_source_to_domain() is unable to advance based on the domains frontier information, as it is held back by the user created attribute.

One solution is to simply not probe the user created and transactable attributes.
Transactions to the user attributes are given to its session at the correct current epoch.
One thing that arrises is that then user input is bound to source timestamps...

Server module should only expose request handling

The server module currently exposes the server's handler functions individually. This invites specific server implementations to accidentally use them without going through the sequencer first, breaking determinism across workers.

Plan::Inspect stage

An inspect stage logging either actual tuples or number of tuples flowing through a given plan stage.

Proof-of-concept: Historical queries

We should experiment with something similar to as-of queries in Datomic and what implications it would have for indexing and overall performance.

Infinite loop in is_any_outdated()

is_any_outdated() uses an internal timer which advance_domain() changes.
If we probe a Dataflow that has not started or has no current inputs we might run into an infinite loop in the server at : worker.step_while(|| server.is_any_outdated());
because we already advanced the inputs but the probe still only sees the "old" timestamps.

CardinalityOne not correct for partially-ordered times

The state machine powering the CardinalityOne operator currently assumes totally ordered input timestamps, s.t. when processed in time order (i.e. in the order that consolidate will reveal them), all keys end up in the correct state.

In fact, the current implementation is a bit stronger, in that it should still produce correct results on bitemporal inputs, as long as event times increase monotonically per key. This is due to an additional sort by event time after consolidation.
For general partially-ordered inputs this is not safe any longer.

A solution might be to re-implement CardinalityOne as a special kind of arrange. This way we could use the trace handle to lookup the correct last value as of the input time, without having to maintain two traces per attribute.

The existing implementation should get rid of the additional sort and live on as CardinalityOneTotal, or something like that.

TopK Operator

We are often interested in the top k results, ordered by some symbol, i.e. in a combination of ORDER_BY and LIMIT. Without a limit, sorting isn't really something we can be responsible for (more efficiently than the client itself could sort results).

Add documentation for `Plan` members.

The members of

pub enum Plan {
    /* members */
}

don't have any documentation, and to the extent that they (or their descendants) are the public surface of the query language, they should explain what they each do. I'm happy to take a stab at that!

I have a related goal of making each of them into struct types, and attempting to have each of them implement a trait that corresponds to create_inputs and implement_plan, so that we can break apart i. the set of query operators from ii. the implementation of each query operator. With your blessing, I can take a swing at that at the same time.

Subtraction overflow on transactions at t0

One issue here is that AttributeConfig::real_time should probably use use zero trace_slack anyways.

But probably we should only honour the trace_slack if the domain has advanced far enough for the subtraction not to overflow.

master#3b21db9 "imports can only refer to extern crate names passed with `--extern` on stable channel"

On master 3b21db9, I'm not able to compile using either stable or nightly. Seeing these errors:

   Compiling timely_communication v0.8.0 (https://github.com/TimelyDataflow/timely-dataflow#74c13563)
   Compiling timely v0.8.0 (https://github.com/TimelyDataflow/timely-dataflow#74c13563)
   Compiling url v1.7.2
error[E0658]: imports can only refer to extern crate names passed with `--extern` on stable channel (see issue #53130)
  --> /Users/petrus/.cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/74c1356/src/lib.rs:70:9
   |
70 | pub use execute::{execute, execute_directly, execute_from_args, example};
   |         ^^^^^^^
...
94 | pub mod execute;
   | ---------------- not an extern crate passed with `--extern`
   |
   = help: add #![feature(uniform_paths)] to the crate attributes to enable
note: this import refers to the module defined here
  --> /Users/petrus/.cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/74c1356/src/lib.rs:94:1
   |
94 | pub mod execute;
   | ^^^^^^^^^^^^^^^^

error[E0658]: imports can only refer to extern crate names passed with `--extern` on stable channel (see issue #53130)
  --> /Users/petrus/.cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/74c1356/src/lib.rs:71:9
   |
71 | pub use order::PartialOrder;
   |         ^^^^^
...
95 | pub mod order;
   | -------------- not an extern crate passed with `--extern`
   |
   = help: add #![feature(uniform_paths)] to the crate attributes to enable
note: this import refers to the module defined here
  --> /Users/petrus/.cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/74c1356/src/lib.rs:95:1
   |
95 | pub mod order;
   | ^^^^^^^^^^^^^^

   Compiling ws v0.7.8 (https://github.com/comnik/ws-rs#fdf5a291)
error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0658`.
error: Could not compile `timely`.
warning: build failed, waiting for other jobs to finish...
error: build failed

Named inputs

Add support for named query inputs and expose additional input handles to them. This would allow to parameterize a registered query without re-registering.

WebSocket Server Cleanup

  • Handle all connection events
  • Clean client interests on disconnect
  • Move to mio-extras
  • fix exception on attempt to send results to clients that have disconnected

Proper connection management

The server currently supports only a single connection, which might not be the number necessary for production use.

Expose freeze operator

Differential's freeze operator allows us to do as-of style queries. Those are not super interesting in a streaming setting, but can be very helpful when sharing results via URI. In the multi-temporal case, it might be interesting to freeze one dimension and let the other one run.

Error handling

Currently we panic if anything is out of order, which leads to the shutdown of the process.
In the future we need a resilient structure that notifies the clients via the web socket that some computation in a single dataflow went crazy, but continues the others.

Memory leak with logging enabled

With --enable-logging the server struct holds on to EventLink's for Timely and Differential events, resulting in those never dropping elements.

Re-use arrangements from NameExpr

NameExpr prevents re-use of arrangements, because they get wrapped into collections. With the introduction of Implemented, we can now implement something like ArrangedRelation, to fix this.

Input source for commands

It will often be the case, that certain dataflows should always run. After a cluster crash / shutdown, such queries would have to be manually re-registered right now. I thus propose adding a new, special input source that can read server commands from a durable source-of-truth.

Build errors?

Fresh clone of repo and cargo run using latest stable channel failing with following errors:

error[E0599]: no method named `register_source` found for type `declarative_dataflow::server::Server<u64, mio::Token>` in the current scope
   --> src/bin/server.rs:626:60
    |
626 |                                 if let Err(error) = server.register_source(source, scope) {
    |                                                            ^^^^^^^^^^^^^^^
error[E0599]: no method named `register_sink` found for type `declarative_dataflow::server::Server<u64, mio::Token>` in the current scope
   --> src/bin/server.rs:633:60
    |
633 |                                 if let Err(error) = server.register_sink(req, scope) {
    |                                                            ^^^^^^^^^^^^^

Is it just me?

Don't want to lead us down the wrong track but I noticed I have a new Cargo.lock file. I think that means I could have different versions of dependencies?

Aggregate and group by same symbol

When we specify the same symbol as key and aggregation symbol we get an index out of bounds problem.

:find ?e ?amount (count ?amount)
...

Even though such a query does not make sense we should guard the user.

The problem occurs when calling tuples_by_symbols and subsequent trying to access the now empty value vector

Lazy Sources

We should split up RegisterSource in the same vein as RegisterQuery + Interest. This would allow us store source configurations and either synthesize them in combination with a query or into an attribute.

The former would allow us to create purely streaming queries with no historical state.

Selective indexing based on schema

Currently, all tuples are added to every index. This should be configurable via the schema, akin to something like the :db/index property in Datomic.

Advancing traces

Currently, index traces are never advanced and will thus never compact accumulated changes. This should probably not be the case. The idea was floated of optionally maintaining such an index structure separately, in order to support historical queries.

Generalize `Register` request

What do you think about generalizing the Register request, which is currently

    Register { query_name: String, plan: Plan, rules: Vec<Rule> },

to something like

    Register { public: Vec<Rule>, private: Vec<Rule> }

where the intent is that all rules in public are published, and all rules private are locally available but not published.

Add remaining index orders

We currently support four out of the seven possible indexes for [e a v] tuples ((e a) -> v, e -> (a v), a -> (e v), and (a v) -> e). The remaining ones should be added once #7 lands.

rustfmt

Run (and keep using) rustfmt real soon now ๐Ÿ™ƒ

Edit: Also, think about using clippy

Remove unnecessary recursive collections

It might make sense to have an option for the frontend to annotate Rule's as not requiring recursion. This would allow us to avoid creating a Variable and to create a Collection directly.

Only sequence commands that affect the dataflow

Right now, all commands are passed through the sequencer, even though some of them do not affect the dataflow and work is only done by the owning worker anyways. In particular Transact and Register should eventually just side-step the sequencer.

edit: as long as we don't have to performance-wise, why risk subtle problems here?

Fix aggregations and extend set of built-in functions

  • COUNT is actually SUM at the moment, because I am an idiot
  • MIN and MAX do not group correctly
  • No support for something like Datomic's:with clause
  • Potentially hairy type conversions
  • Aggregation might re-order relations, violating any underlying projection
  • COUNT-DISTINCT
  • AVG
  • MEDIAN
  • VARIANCE
  • STDDEV
  • Floating points

Datomic also offers a couple of things returning collections, some of which (e.g. distinct) directly map to Differential operators and should be treated as such.

Clarify data model assumptions

The intent of this project is to be frontend agnostic. Therefore any special assumptions relating to e.g. Datalog should be handled within the respective front end.

Right now there is at least an assumption on distinct collections wired in, which means we do have some implicit data model assumptions. That should eventually get migrated to the front end too, for people who want relations to be distinct, without harming the folks who want multiplicities.

100% CPU usage during idle

Not sure if this is timely related, websockets or something about polling, but I'm seeing release executable using 100% CPU during idle. Is this expected?

Probably this loop

Replace CLI code with separate ctl tool

The extra bits for optionally accepting command line arguments (with --enable-cli enabled) directly from within the server were useful for a while but don't really make sense in the long run.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.