Coder Social home page Coder Social logo

vmware / database-stream-processor Goto Github PK

View Code? Open in Web Editor NEW
220.0 14.0 19.0 16.22 MB

Streaming and Incremental Computation Framework

License: Other

Rust 87.58% Shell 1.20% HTML 0.03% CSS 0.02% TypeScript 10.10% Dockerfile 0.09% Python 0.89% Scheme 0.03% Makefile 0.04% JavaScript 0.02%

database-stream-processor's Introduction

License: MIT CI workflow codecov

Latest News

The development for this project is now done at https://github.com/feldera/dbsp.

Database Stream Processor

Database Stream Processor (DBSP) is a framework for computing over data streams that aims to be more expressive and performant than existing streaming engines.

DBSP Mission Statement

Computing over streaming data is hard. Streaming computations operate over changing inputs and must continuously update their outputs as new inputs arrive. They must do so in real time, processing new data and producing outputs with a bounded delay.

We believe that software engineers and data scientists who build streaming data pipelines should not be exposed to this complexity. They should be able to express their computations as declarative queries and use a streaming engine to evaluate these queries correctly and efficiently. DBSP aims to be such an engine. To this end we set the following high-level objectives:

  1. Full SQL support and more. While SQL is just the first of potentially many DBSP frontends, it offers a reference point to characterize the expressiveness of the engine. Our goal is to support the complete SQL syntax and semantics, including joins and aggregates, correlated subqueries, window functions, complex data types, time series operators, UDFs, etc. Beyond standard SQL, DBSP supports recursive queries, which arise for instance in graph analytics problems.

  2. Scalability in multiple dimensions. The engine scales with the number and complexity of queries, streaming data rate and the amount of state the system maintains in order to process the queries.

  3. Performance out of the box. The user should be able to focus on the business logic of their application, leaving it to the system to evaluate this logic efficiently.

Theory

The above objectives can only be achieved by building on a solid mathematical foundation. The formal model that underpins our system, also called DBSP, is described in the accompanying paper:

The model provides two things:

  1. Semantics. DBSP defines a formal language of streaming operators and queries built out of these operators, and precisely specifies how these queries must transform input streams to output streams.

  2. Algorithm. DBSP also gives an algorithm that takes an arbitrary query and generates a dataflow program that implements this query correctly (in accordance with its formal semantics) and efficiently. Efficiency here means, in a nutshell, that the cost of processing a set of input events is proportional to the size of the input rather than the entire state of the database.

DBSP Concepts

DBSP unifies two kinds of streaming data: time series data and change data.

  • Time series data can be thought of as an infinitely growing log indexed by time.

  • Change data represents updates (insertions, deletions, modifications) to some state modeled as a table of records.

In DBSP, a time series is just a table where records are only ever added and never removed or modified. As a result, this table can grow unboundedly; hence most queries work with subsets of the table within a bounded time window. DBSP does not need to wait for all data within a window to become available before evaluating a query (although the user may choose to do so): like all queries, time window queries are updated on the fly as new inputs become available. This means that DBSP can work with arbitrarily large windows as long as they fit within available storage.

DBSP queries are composed of the following classes of operators that apply to both time series and change data:

  1. Per-record operators that parse, validate, filter, transform data streams one record at a time.

  2. The complete set of relational operators: select, project, join, aggregate, etc.

  3. Recursion: Recursive queries express iterative computations, e.g., partitioning a graph into strongly connected components. Like all DBSP queries, recursive queries update their outputs incrementally as new data arrives.

In addition, DBSP supports windowing operators that group time series data into time windows, including various forms of tumbling and sliding windows, windows driven by watermarks, etc.

Architecture

The following diagram shows the architecture of the DBSP platform. Solid blocks indicate components that we are currently working on; white blocks with dashed borders are on our TODO list.

DBSP architecture

The DBSP core engine is written in Rust and provides a Rust API for building data-parallel dataflow programs by instantiating and connecting streaming operators. Developers can use this API directly to implement complex streaming queries. We are also developing a compiler from SQL to DBSP that enables engineers and data scientists to use the engine via a familiar query language. In the future, we will add DBSP bindings for languages like Python and Scala.

At runtime, DBSP can consume inputs from and send outputs to event streams, e.g., Kafka, databases, e.g., Postgres, and data warehouses, e.g., Snowflake.

The distributed runtime will extend DBSP's data-parallel execution model to multiple nodes for high availability and throughput.

Applications

TODO

Documentation

The project is still in its early days. API and internals documentation is coming soon.

TODO

Contributing

Setting up git hooks

Execute the following command to make git commit check the code before commit:

GITDIR=$(git rev-parse --git-dir)
ln -sf $(pwd)/tools/pre-push ${GITDIR}/hooks/pre-push

Running Benchmarks against DBSP

The repository has a number of benchmarks available in the benches directory that provide a comparison of DBSP's performance against a known set of tests.

Each benchmark has its own options and behavior, as outlined below.

Nexmark Benchmark

You can run the complete set of Nexmark queries, with the default settings, with:

cargo bench --bench nexmark --features with-nexmark

By default this will run each query with a total of 100 million events emitted at 10M per second (by two event generator threads), using 2 CPU cores for processing the data.

To run just the one query, q3, with only 10 million events, but using 8 CPU cores to process the data and 6 event generator threads, you can run:

cargo bench --bench nexmark --features with-nexmark -- --query q3 --max-events 10000000 --cpu-cores 8 --num-event-generators 6

For further options that you can use with the Nexmark benchmark,

cargo bench --bench nexmark --features with-nexmark -- --help

An extensive blog post about the implementation of Nexmark in DBSP: https://liveandletlearn.net/post/vmware-take-3-experience-with-rust-and-dbsp/

database-stream-processor's People

Contributors

absoludity avatar blp avatar gz avatar kixiron avatar lalithsuresh avatar rtjohnso avatar ryzhyk avatar utaal avatar vmwghbot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

database-stream-processor's Issues

Better name for async operators.

Async is not a good name, as these operators are still synchronous as in "synchronous circuits".

Some proposed alternatives (but I think we can do better): "interface", "extern", "EnvironmentReader", "EventDriven", "notifier".

Don't keep in-progress batches.

Is your feature request related to a problem? Please describe.

The design for traces that we inherited from DD can stop an in-progress merge of two batches if it runs out of fuel halfway through the merge. Such an in-progress merge can use 2x memory since it keeps the original batches around until the merge completes. I wonder how important the optimization is and whether we can make things simpler and more memory-efficient by always running each merge to completion.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Unexpected CyclicCircuit error

Ran into the following error (CyclicCircuit) trying to build a circuit; I'm fairly certain this should work.

Minimal reproducible example:

use dbsp::Circuit;

fn main() {
    let (_, _) = Circuit::build(|circuit| {
        let (action_stream, action_handle) = circuit.add_input_stream();

        let _ = circuit.iterate(|child| {
            let _ = action_stream
                .apply_owned(|action: i32| action)
                .delta0(child);
            Ok((
                move || Ok(true),
                ()
            ))
        }).unwrap();

        action_handle
    }).unwrap();
}

Errors with the following on the current git (dbsp = { git = "https://github.com/vmware/database-stream-processor" }):

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: CyclicCircuit { node_id: GlobalNodeId([NodeId(1)]) }', crates/pact-server/src/main.rs:24:8
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Remove thread locals from circuits

Is your feature request related to a problem? Please describe.

We currently do a number of weird stuff with thread locals and parking right now, incurring an extra dependency and other weirdness from using ""global"" values. Firstly, we can use JoinHandle::thread().unpark() to unpark threads and remove the need for Parker and Unparkers. Secondly, we can just directly give a single Arc<AtomicBool> to each worker thread so that they can all check one single atomic instead of creating a new atomic for each worker (we can also just Acquire/Release ordering for said boolean). The only problem (and what got me stuck on implementing this) is that it requires handing around a handle to something to the circuit, which I was using Runtime for until I got to some of the tests that don't use any sort of runtime to power them. Ultimately though this'd be a good change and would simplify things while also allowing uniform access of information to operators

Describe the solution you'd like

.

Describe alternatives you've considered

.

Additional context

.

Optimized incremental aggregation for linear incremental functions.

Is your feature request related to a problem? Please describe.

Linear aggregates can be computed incrementally. Converting the output of a linear aggregate to a Z-set in unfortunately no longer linear. Question is whether there's anything useful we can do with regular sets or perhaps with Z-sets with non-standard weights (since the output of a linear aggregate can be modeled as a Z-set whose weight type is the value of the aggregate). If so, we can expose the purely linear aggregation API; otherwise, it will have to bundled with Z-set conversion.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Non-equi joins

DBSP should be to handle incremental joins that do not use just field equality efficiently - they are still bilinear operators.
The current API based on indexes does not allow expressing non-equi joins.
As a side note, that something that DDlog probably can't implement efficiently.

Circuit I/O API

RFC: Circuit I/O API

This RFC proposes an API for feeding data to a circuit (so really just the "I"
part of "I/O" for now) and for driving the execution of the circuit with and
without a multithreaded runtime.

Model of Computation: Synchronous Transactions

A transaction is the user's view of one iteration of a synchronous circuit. It
consumes a bunch of input data and produces corresponding outputs. A new
transaction cannot be started until the current transaction completes (however,
the multithreaded API allows pushing data to the circuit while a transaction is
running; this data will be processed by the next transaction). Transactions
cannot be rolled back. A transaction also cannot fail. An error while
executing a transaction (e.g., out of memory or disk read error) leaves the
circuit in a failed state, at which point it needs to be restarted (possibly
from the last checkpoint).

One might argue that this notion of a transaction is weak compared to what
people normally expect. I don't mind using different terminology.

API

Input handles

An input handle is a handle to an input stream of a circuit that can be used to
feed data to it. Implementation-wise, input handles connect to special source
operators of type Input:

                               Circuit
                ┌──────────────────────────────────┐
┌──┐            │  ┌────────┐ Stream<T1>           │
│h1├────────────┼─►│Input1  ├────►                 │
└──┘            │  └────────┘                      │
                │                                  │
┌──┐            │  ┌────────┐ Stream<T2>           │
│h2├────────────┼─►│Input2  ├────►                 │
└──┘            │  └────────┘                      │
                │                                  │
┌──┐            │  ┌────────┐ Stream<T3>           │
│h3├────────────┼─►│Input3  ├────►                 │
└──┘            │  └────────┘                      │
                └──────────────────────────────────┘

An input handle works with both with and without a multithreaded runtime.
In the former case it can be used to push data to the circuit from multiple threads,
asynchronously with the execution of the circuit.

InputHandle<T>

A handle to an input stream holding values of type T. T is not necessarily a collection type (Batch). It can be for instance a single number representing physical time.

Methods:

  • set_for_worker(&self, worker_index: usize, val: T) - set value to send to the
    stream for the specified worker during the next transaction.

  • set_for_all_workers(&self, val: T) - set value to send to the stream
    for the next transaction. The same value will be sent to all worker threads
    (useful, e.g., for physical clock input, which must be consistent across workers).

Workers whose inputs have not been set using one of these methods will receive T::default().

IndexedZSetHandle<K, V, W>

A handle to a collection of weighted (key, value) tuples. Automatically shards tuples
across workers.

Methods:

  • push(&self, k, v, w)
  • push_tuples(&self, tuples: &[(K, V, W)])
  • rollback(&self) - clear all uncommitted updates

SetHandle<T>

A handle to a collection that behaves as a set: all values have weight 1, duplicate
inserts and deletes are ignored. Automatically shards values across workers. DBSP
will internally cache the contents of the set (in a trace).

Methods:

  • delete(k) - delete value if exists
  • insert(k) - insert value if does not exist
  • rollback(&self) - clear all uncommitted updates

MapHandle<K, V>

A handle to a collection that behaves as a key-value map: exactly one value per key, all weights
are 1. Automatically shards key/value pairs across workers. DBSP internally stores the contents
of the map (in a trace).

Methods:

  • delete_key(k) - delete the value associated with a key (if exists)
  • upsert(k, v) - insert a key/value tuple, deleting existing values associated with the key, if any.
  • rollback(&self) - clear all uncommitted updates

At commit time, updates are applied in the order they are submitted.

CircuitHandle

A handle to a circuit running in the local thread. This abstraction already
exists under an unfortunate name Root.

  • step(&self) (needs a better name) - runs the circuit for a single clock cycle, feeding any
    inputs accumulated since the previous call.
  • compact(&self) -> bool - Do some trace compaction work. Since the circuit runs on the
    caller's instruction pointer, this needs to be scheduled explicitly. Returns true if
    non-zero amount of compaction was performed. false means there's nothing more to compact.

DBSPHandle

A handle to a multithreaded runtime running N identical copies of a circuit.
This abstraction already exists under the name Runtime.

  • run(Fn(Circuit<()>) -> Result<T, DBSPError>) -> (DBSPHandle, T)

  • commit() -> Result<(), DBSPError> - runs all copies of the circuit in the runtime for one
    clock cycle, consuming any inputs received since the previous commit. The methods returns
    after fully processing all inputs and producing results. Multiple concurrent calls are
    serialized.

  • stop - terminate the runtime along with all workers.

  • TODO: Monitoring/profiling APIs.

nexmark fails on latest master [e50067c]

rev: e50067c

cargo +nightly bench --bench nexmark --features with-nexmark -- --first-event-rate=10000000 --max-events=1000000 --cpu-cores 8 --num-event-generators 6 --source-buffer-size 10000 --input-batch-size 40000

1,000,000 / 1,000,000 [=====================================================================] 100 % 2,387,494.1547/s 0sStarting q17 bench of 1000000 events...
40,000 / 1,000,000 [==>-----------------------------------------------------------------------] 4 % 1,745,934.5695/s 1sthread 'thread 'dbsp-worker-3' panicked at 'index out of bounds: the len is 373 but the index is 18446744073709551615', src/trace/layers/ordered.rsthread 'dbsp-worker-1' panicked at 'index out of bounds: the len is 377 but the index is 18446744073709551615', dbsp-worker-5thread '' panicked at 'dbsp-worker-4index out of bounds: the len is 373 but the index is 18446744073709551615' panicked at '', src/trace/layers/ordered.rssrc/trace/layers/ordered.rs::914:914914::2222
index out of bounds: the len is 371 but the index is 18446744073709551615
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
', src/trace/layers/ordered.rs:914:22:
22
thread 'dbsp-worker-7' panicked at 'index out of bounds: the len is 377 but the index is 18446744073709551615', src/trace/layers/ordered.rsthread 'dbsp-worker-6' panicked at 'index out of bounds: the len is 374 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
:914:22
thread 'dbsp-worker-0' panicked at 'index out of bounds: the len is 381 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
thread 'dbsp-worker-2' panicked at 'index out of bounds: the len is 380 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
thread 'benchmark_consumer' panicked at 'called `Result::unwrap()` on an `Err` value: Runtime(WorkerPanic(0))', benches/nexmark/main.rs:67:29

RFC: Parallelizing circuits.

  • Multit-hreaded runtime
  • Exchange operator (non-incremental)
  • Dynamic scheduling
  • Incremental exchange operators

We discuss strategies for parallelizing the execution of DBSP circuits.

NOTE: This is orthogonal to the question of speeding up individual operators
using SIMD instructions or GPUs.

The high-level goal is to shard processing across multiple worker threads while
minimizing bottlenecks due to thread communication and synchronization. As a
starting point for the design, the DD architecture where each thread runs its
own copy of the circuit seems suitable. These copies happen to have identical
topologies, i.e., contain the same operators and subcircuits connected in the
same way (not strictly required, but I have a hard time imagining why we would
want something else).

Fragments of the circuit may execute locally without any inter-thread
communication, e.g., assuming input tables have been sharded, a sequence of
linear operators, including group_by and linear aggregates, can execute
locally with results from multiple workers assembled at the end of the pipeline.

Other operators, most noticeably joins, may require communication, but even that
can sometimes be avoided, e.g., R1(x,y), R2(y,z) can be evaluated locally
assuming R1 and R2 are sharded so that records that share the same value of
y are produced by the same worker. With DDlog we've seen workloads that in
theory could have been partitioned based on some attribute with the bulk of
processing occurring without cross-worker communication. But of course we
couldn't take advantage of this when running on top of DD.

We therefore do not want to automatically shard all operators or all streams.
Instead, sharding decisions should be left to the compiler or programmer. A
modular way to achieve this is to encapsulate sharding inside special
Exchange operators. Such an operator reads locally computed values from its
input stream, distributes them across its peers in other workers based on a
sharding function, and outputs the set of values received from peers:

                                                 Exchange
                                                 ┌───────┐
            ┌───────┐                            │       │
            │ op 1  ├──────────────┐             │       │
            └───────┘              ▲             │       │
                               ┌───────┐         │       │          ┌───────┐
                               │  op 3 ├─────────┼─┬──┬──┼─────────►│ op 4  │
                               └───────┘         │ │  │  │          └───────┘
            ┌───────┐              ▲             │ │  │  │
            │ op 2  ├──────────────┘             │ │  │  │
            └───────┘                            │ │  │  │
WORKER 1                                         │ │  │  │
─────────────────────────────────────────────────┼─┼──┼──┼──────────────────────────
WORKER 2                                         │ │  │  │
            ┌───────┐                            │ │  │  │
            │ op 1  ├──────────────┐             │ │  │  │
            └───────┘              ▼             │ │  │  │
                               ┌───────┐         │ │  │  │          ┌───────┐
                               │  op 3 ├─────────┼─┴──┴──┼─────────►│ op 4  │
                               └───────┘         │       │          └───────┘
            ┌───────┐              ▲             │       │
            │ op 2  ├──────────────┘             │       │
            └───────┘                            └───────┘

An interesting question is: how are Exchange operators scheduled? The simplest
approach is to stick to static scheduling. When an Exchange operator is
evaluated, it distributes its inputs across peer mailboxes and then blocks
waiting for all peers to put values in its incoming mailboxes. This means that
the whole pipeline will run at the speed of the slowest worker.

As an optimization, we can implement communication-aware static scheduling: (1)
split Exchange operators into the input half that writes data to peer mailboxes
and output half that collects data from peers. We compute a static schedule
that tries to evaluate all inputs halves as early as possible and all output
halves as late as possible to give stragglers time to catch up and reduce
waiting times.

Finding an optimal static schedule can be hard or impossible. Therefore, the
next option is to implement a dynamic scheduler that schedules the output halves
of Exchange operators as they become ready. This will require a new mechanism
for an operator to inform the scheduler about its status. Even with this
design, synchronization will be a bottleneck for some workloads.

We therefore consider a more radical solution that takes advantage of
incremental computation to reduce data dependencies between workers. Rather
than waiting for inputs from all peers to become available, Exchange operators
can be placed in an iterative scope. At each nested clock cycle, an Exchange
operator yields new inputs received from its peers. The circuit processes these
inputs before reading more inputs at the next iteration. Iteration continues
until all data has been sent and received.

We don't need to choose one of the above options. There's enough modularity in
the design of the framework to support all or some of them. We can start with
implementing a simple (non-iterative) exchange operator and using it with a
static scheduler. As the next step, we add dynamic scheduling and see how much
difference it makes in benchmarks. We can experiment with iterative Exchange
operators if we still see bottlenecks.

Publish DBSP on crates.io

Is your feature request related to a problem? Please describe.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Versioning

Should we start using semantic versioning for DBSP?

rewind_keys sets valid unconditionally in OrdZSet

this test currently passes, but in discussion with leonid I think both cursors should have !val_valid() (e.g., spine impl is correct but OrdZSet isn't).
OrdZSet sets valid to true unconditionally on every rewind_keys but should probably check if the set is empty (or check something stronger/weaker, not sure what the right condition is).

#[test]
fn silly() {
    let zset_builder = ord::zset_batch::OrdZSetBuilder::<usize, usize>::new(());
    let zset = zset_builder.done();
    let mut cursor = zset.cursor();
    cursor.rewind_keys();
    assert!(cursor.val_valid());

    let mut spine = Spine::<OrdZSet<usize, usize>>::new(None);
    spine.insert(zset.clone());
    let mut spine_cursor = spine.cursor();
    spine_cursor.rewind_keys();
    assert!(!spine_cursor.val_valid());
}

Lazily consolidate streams

Currently pretty much every operator expects a sorted & consolidated collection and produces a sorted & consolidated collection, but this work is very often redundant. Ephemeral operators like .filter() or .map() don't care about the order or consolidated-ness of their inputs or outputs, so there's no need to always marshal their inputs/outputs into OrdZSet and the like, they can consume arbitrary inputs and produce unordered batches of values

TopK and Sorting operators

TopK (and sorting, being closely related) is a super important operator for streaming systems and databases as a whole, as is being able to sort data and so we should support both.
Both operations can be done in a linear and tree-like manner, we should probably offer both for greater flexibility. (The tree-like forms consist of partitioning the input data into N streams, sorting/topk-ing each stream and then recombining them via sort/topk in a tree until a single result is reached, this can allow for partial incrementality at the cost of memory)

Traces with multiple batch types.

Is your feature request related to a problem? Please describe.

We would like to be able to add untimed batches (Batch<Time=()>) to a timed trace. Currently this requires re-constructing the batch by adding the same timestamp value to each tuple. Instead we could implement a new batch type that stores a single copy of the timestamp and have traces access such batches. Only when two batches with different time stamps are merged a regular batch is produced. This requires a trace implementation built on top at least two different batch types.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Fix operator naming

Is your feature request related to a problem? Please describe.

Operator names are currently a mess, for example join_trace is really an incremental join on nested streams. Same with distinct_trace. We need some kind of naming system that encodes what operators do instead of how they are implemented.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Structured operator summaries.

Is your feature request related to a problem? Please describe.

The Operator::summary method returns printable operator metadata as a string. We would like to use a more structured representation, but since metadata is operator-specific, we cannot use a pre-defined structure here. A dynamic representation (e.g., JSON) may be the sweet spot.

This will also allow us to eliminate the dependency on textwrap.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Implement `BatchReader` trait for traces.

Is your feature request related to a problem? Please describe.

Similar to BatchReader, the TraceReader API exposes an iterator over all elements in the trace. The two APIs can be reconciled with a bit of work. This way, any operator that works on batches would also work on traces, e.g., Join currently takes a batch as input. When using Join as part of join_incremental computation, one of the input batches is an integral of the input stream. With this change we will be able to switch to integrating streams of (indexed) Z-sets into a trace instead of a batch and everything would still work.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Replace `UnsafeCell` with `RefCell` in `circuit_builder.rs`

Is your feature request related to a problem? Please describe.

UnsafeCell was a premature optimization. Its use is not well encapsulated, and it likely enables unsafe behavior.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Eliminate dependency on timely.

Is your feature request related to a problem? Please describe.

We are only using a couple of timely traits: PartialOrder and Antichain. Copy them to DBSP to completely eliminate the dependency.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Add LDBC Benchmarks

LDBC Benchmarks are a great source of tests that really hone in on the things we need to benchmark:

  • LDBC Social Network Benchmark (LDBC-SNB)
    • The Social Network Benchmark's Interactive workload is focusing on transactional graph processing with complex read queries that access the neighbourhood of a given node in the graph and update operations that continuously insert new data in the graph
    • The Social Network Benchmark's Business Intelligence workload is focusing on aggregation- and join-heavy complex queries touching a large portion of the graph with microbatches of insert/delete operations
  • LDBC Graphalytics Benchmark (LDBC Graphalytics), focuses on large-scale graph analysis
  • LDBC Semantic Publishing Benchmark (LDBC-SPB) SPB performance is measured by producing a workload of CRUD (Create, Read, Update, Delete) operations which are executed simultaneously. The benchmark offers a data generator that uses real reference data to produce datasets of various sizes and tests the scalability aspect of RDF systems. The benchmark workload consists of (a) editorial operations that add new data, alter or delete existing (b) aggregation operations that retrieve content according to various criteria

These are a really good mix of both static and streaming datasets, which is exactly what we need.
Prior art from Frank McSherry w/ DDFlow

Async trace compaction

Is your feature request related to a problem? Please describe.

Use slack time to compact traces.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Add a free-form issue template

Describe the bug

Currently we have two very strict issue templates that not all issues fall under, we should add one that just allows freedom to write whatever users want to

Reproduction steps

.

Expected behavior

.

Additional context

.

Move time tracking to the circuit.

Is your feature request related to a problem? Please describe.

Instead of having each operator track current time, move time tracking into the circuit.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Remove integrate_nested, Z1Nested.

Is your feature request related to a problem? Please describe.

The integrate_nested method is only used in tests and should be removed along with Z1Nested.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

`Cursor::seek` should return the number of keys skipped over during search

Is your feature request related to a problem? Please describe.

This will help us preallocate the right amount of buffer space in some situations, see e.g., the TODO in window.rs.

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Count operator

We need to add a .count() operator that counts the number of values for any given key, e.g. (K, V).count() -> (K, isize)

Add operators that take both owned and referential inputs

Is your feature request related to a problem? Please describe.

We currently have operators that either take owned values or take referenced ones, but both just defer to an inner operator which explicitly handles reference and owned inputs separately. Generated code could take advantage of being able to handle both cases differently, such as avoiding allocations or whatever which could be very useful

Describe the solution you'd like

Add variants of operators that allow providing a closure to handle reference values and a closure to handle owned values

Describe alternatives you've considered

No response

Additional context

No response

More doc examples.

Is your feature request related to a problem? Please describe.

Describe the solution you'd like

Add more doc examples when we have some interesting relational operators.

Describe alternatives you've considered

No response

Additional context

No response

RFC: Operator API design

RFC: Operator API design (so far just a list of problems with the current design)

The current API for assembling operators into circuits has poor ergonomics.
This is partially because it is intentionally low-level, designed to support maximal
flexibility as opposed to ease of use, and partially because it was built in
a bottom-up fashion, with little consideration for user experience.
My goal is to cleanup the design where possible and also hide some low-level details
behind a less flexible but more user-friendly higher-level API.

Below is a summary of problems along with some ideas for addressing them, but first
I mention the aspects of the current design that work well and that I'd like to
preserve going forward.

The good

  • The functional style of instantiating operators, similar to DD, LINQ, and many other
    systems is nice compared to manually instantiating operators and connecting their
    outputs (the stream.map().join().distinct() stuff).

  • The operator caching feature allows creating high-level operators without introducing
    a performance hit, e.g., stream.distinct_trace() internally calls stream.trace().
    If another operator in the circuit calls stream.trace(), it will get a reference to
    the same trace rather than creating a new one.

The bad

  1. API requires type annotations. Most operators are generic over return type, e.g.,
    happy to return any type that implements trait ZSet. The goal is to support multiple
    ZSet implementations (e.g., in-memory and disk-backed), but the downside is that the
    programmer often needs to explicitly specify the return type using type annotations.

    • Proposed solution: PR #94
  2. Operator naming. Operator names are currently a complete mess. Names like
    join_trace for example make zero sense to the user. Ignoring the current lame naming
    scheme, a sensible naming convention is not easy to some up with because DBSP
    supports multiple flavors of the same operator. Linear operators (map, filter, index,
    etc.) are easy, as the same implementation works in incremental and non-incremental cases
    and for arbitrary nesting depth. join, aggregate, distinct, and other non-linear
    operators are more tricky. First, they have incremental and non-incremental versions.
    We typically use the former in practice, but there are situations where non-incremental
    operators are useful. Whether we use an _incremental suffix for incremental operators
    of _non_incremental (or something to that effect) for non-incremental operators, this
    will be easy to get wrong since the type system currently doesn't distinguish between
    streams of changes and streams that carry complete collections. An additional complication
    is that some operators have incremental implementations specialized for different nesting
    depths. The type system does not currently enforce the use of the correct implementation
    for the given scope.

    • Proposed solution: PR #98
  3. Trace-based API vs generic API. Operators like integrate(), integrate_nested()
    work for any values that form a group, but are inefficient when working with Z-sets and
    indexes Z-sets, which is what we care about 99% of the time. Users should instead
    call methods of the trace-based API, e.g., integrate_trace. This is another potential
    source of confusion.

  4. Export API. The API to export a stream from a nested circuit is verbose and easy to
    get wrong. First, the user must call stream.integrate_trace() inside the circuit to
    sum up all deltas computed across all iterations of the circuit into a trace. In the
    parent circuit, the user must call consolidate to consolidate the trace into a single
    batch.

  5. Recursion API. Today, a recursive relation is created by manually instantiating a
    DelayedFeedback operator and closing the loop using its .connect() method. In addition,
    the user must explicitly apply distinct to the recursive relation to ensure convergence.
    This feels verbose and low-level.

    • Proposed solution to this and previous problem: PR #100
  6. Documentation. Once the above issues have been addressed, the resulting API
    needs nice top down documentation with examples.

@absoludity

[RFC] Use hashbrown directly instead of HashMap.

Is your feature request related to a problem? Please describe.

HashMap doesn't allow creating an entry from a reference to a key (this feature is currently unstable and looks like it won't be stabilized any time soon). HashMap is just a wrapper around hashbrown, which does support this. I suggest we use hashbrown directly to get better performance.

@mbudiu-vmw , @Kixiron , what do you think?

Describe the solution you'd like

Describe alternatives you've considered

No response

Additional context

No response

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.