vmware / database-stream-processor Goto Github PK
View Code? Open in Web Editor NEWStreaming and Incremental Computation Framework
License: Other
Streaming and Incremental Computation Framework
License: Other
Use slack time to compact traces.
No response
No response
Things like .recursive()
don't have support for OrdIndexedZSet
which makes working with maps a lot harder
... once its architecture is relatively stabilized.
Instead of having each operator track current time, move time tracking into the circuit.
No response
No response
rev: e50067c
cargo +nightly bench --bench nexmark --features with-nexmark -- --first-event-rate=10000000 --max-events=1000000 --cpu-cores 8 --num-event-generators 6 --source-buffer-size 10000 --input-batch-size 40000
1,000,000 / 1,000,000 [=====================================================================] 100 % 2,387,494.1547/s 0sStarting q17 bench of 1000000 events...
40,000 / 1,000,000 [==>-----------------------------------------------------------------------] 4 % 1,745,934.5695/s 1sthread 'thread 'dbsp-worker-3' panicked at 'index out of bounds: the len is 373 but the index is 18446744073709551615', src/trace/layers/ordered.rsthread 'dbsp-worker-1' panicked at 'index out of bounds: the len is 377 but the index is 18446744073709551615', dbsp-worker-5thread '' panicked at 'dbsp-worker-4index out of bounds: the len is 373 but the index is 18446744073709551615' panicked at '', src/trace/layers/ordered.rssrc/trace/layers/ordered.rs::914:914914::2222
index out of bounds: the len is 371 but the index is 18446744073709551615
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
', src/trace/layers/ordered.rs:914:22:
22
thread 'dbsp-worker-7' panicked at 'index out of bounds: the len is 377 but the index is 18446744073709551615', src/trace/layers/ordered.rsthread 'dbsp-worker-6' panicked at 'index out of bounds: the len is 374 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
:914:22
thread 'dbsp-worker-0' panicked at 'index out of bounds: the len is 381 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
thread 'dbsp-worker-2' panicked at 'index out of bounds: the len is 380 but the index is 18446744073709551615', src/trace/layers/ordered.rs:914:22
thread 'benchmark_consumer' panicked at 'called `Result::unwrap()` on an `Err` value: Runtime(WorkerPanic(0))', benches/nexmark/main.rs:67:29
stream_start
/stream_end
are poorly named as they reset all streams in a clock domain, not just a single stream.
this test currently passes, but in discussion with leonid I think both cursors should have !val_valid() (e.g., spine impl is correct but OrdZSet isn't).
OrdZSet sets valid to true unconditionally on every rewind_keys
but should probably check if the set is empty (or check something stronger/weaker, not sure what the right condition is).
#[test]
fn silly() {
let zset_builder = ord::zset_batch::OrdZSetBuilder::<usize, usize>::new(());
let zset = zset_builder.done();
let mut cursor = zset.cursor();
cursor.rewind_keys();
assert!(cursor.val_valid());
let mut spine = Spine::<OrdZSet<usize, usize>>::new(None);
spine.insert(zset.clone());
let mut spine_cursor = spine.cursor();
spine_cursor.rewind_keys();
assert!(!spine_cursor.val_valid());
}
This will help us preallocate the right amount of buffer space in some situations, see e.g., the TODO in window.rs
.
No response
No response
TopK (and sorting, being closely related) is a super important operator for streaming systems and databases as a whole, as is being able to sort data and so we should support both.
Both operations can be done in a linear and tree-like manner, we should probably offer both for greater flexibility. (The tree-like forms consist of partitioning the input data into N
streams, sorting/topk-ing each stream and then recombining them via sort/topk in a tree until a single result is reached, this can allow for partial incrementality at the cost of memory)
This RFC proposes an API for feeding data to a circuit (so really just the "I"
part of "I/O" for now) and for driving the execution of the circuit with and
without a multithreaded runtime.
A transaction is the user's view of one iteration of a synchronous circuit. It
consumes a bunch of input data and produces corresponding outputs. A new
transaction cannot be started until the current transaction completes (however,
the multithreaded API allows pushing data to the circuit while a transaction is
running; this data will be processed by the next transaction). Transactions
cannot be rolled back. A transaction also cannot fail. An error while
executing a transaction (e.g., out of memory or disk read error) leaves the
circuit in a failed state, at which point it needs to be restarted (possibly
from the last checkpoint).
One might argue that this notion of a transaction is weak compared to what
people normally expect. I don't mind using different terminology.
An input handle is a handle to an input stream of a circuit that can be used to
feed data to it. Implementation-wise, input handles connect to special source
operators of type Input
:
Circuit
┌──────────────────────────────────┐
┌──┐ │ ┌────────┐ Stream<T1> │
│h1├────────────┼─►│Input1 ├────► │
└──┘ │ └────────┘ │
│ │
┌──┐ │ ┌────────┐ Stream<T2> │
│h2├────────────┼─►│Input2 ├────► │
└──┘ │ └────────┘ │
│ │
┌──┐ │ ┌────────┐ Stream<T3> │
│h3├────────────┼─►│Input3 ├────► │
└──┘ │ └────────┘ │
└──────────────────────────────────┘
An input handle works with both with and without a multithreaded runtime.
In the former case it can be used to push data to the circuit from multiple threads,
asynchronously with the execution of the circuit.
InputHandle<T>
A handle to an input stream holding values of type T
. T
is not necessarily a collection type (Batch
). It can be for instance a single number representing physical time.
Methods:
set_for_worker(&self, worker_index: usize, val: T)
- set value to send to the
stream for the specified worker during the next transaction.
set_for_all_workers(&self, val: T)
- set value to send to the stream
for the next transaction. The same value will be sent to all worker threads
(useful, e.g., for physical clock input, which must be consistent across workers).
Workers whose inputs have not been set using one of these methods will receive T::default()
.
IndexedZSetHandle<K, V, W>
A handle to a collection of weighted (key, value) tuples. Automatically shards tuples
across workers.
Methods:
push(&self, k, v, w)
push_tuples(&self, tuples: &[(K, V, W)])
rollback(&self)
- clear all uncommitted updatesSetHandle<T>
A handle to a collection that behaves as a set: all values have weight 1, duplicate
inserts and deletes are ignored. Automatically shards values across workers. DBSP
will internally cache the contents of the set (in a trace).
Methods:
delete(k)
- delete value if existsinsert(k)
- insert value if does not existrollback(&self)
- clear all uncommitted updatesMapHandle<K, V>
A handle to a collection that behaves as a key-value map: exactly one value per key, all weights
are 1. Automatically shards key/value pairs across workers. DBSP internally stores the contents
of the map (in a trace).
Methods:
delete_key(k)
- delete the value associated with a key (if exists)upsert(k, v)
- insert a key/value tuple, deleting existing values associated with the key, if any.rollback(&self)
- clear all uncommitted updatesAt commit time, updates are applied in the order they are submitted.
CircuitHandle
A handle to a circuit running in the local thread. This abstraction already
exists under an unfortunate name Root
.
step(&self)
(needs a better name) - runs the circuit for a single clock cycle, feeding anycompact(&self) -> bool
- Do some trace compaction work. Since the circuit runs on thetrue
iffalse
means there's nothing more to compact.DBSPHandle
A handle to a multithreaded runtime running N
identical copies of a circuit.
This abstraction already exists under the name Runtime
.
run(Fn(Circuit<()>) -> Result<T, DBSPError>) -> (DBSPHandle, T)
commit() -> Result<(), DBSPError>
- runs all copies of the circuit in the runtime for one
clock cycle, consuming any inputs received since the previous commit
. The methods returns
after fully processing all inputs and producing results. Multiple concurrent calls are
serialized.
stop
- terminate the runtime along with all workers.
TODO: Monitoring/profiling APIs.
Should we start using semantic versioning for DBSP?
We currently do a number of weird stuff with thread locals and parking right now, incurring an extra dependency and other weirdness from using ""global"" values. Firstly, we can use JoinHandle::thread().unpark()
to unpark threads and remove the need for Parker
and Unparker
s. Secondly, we can just directly give a single Arc<AtomicBool>
to each worker thread so that they can all check one single atomic instead of creating a new atomic for each worker (we can also just Acquire
/Release
ordering for said boolean). The only problem (and what got me stuck on implementing this) is that it requires handing around a handle to something to the circuit, which I was using Runtime
for until I got to some of the tests that don't use any sort of runtime to power them. Ultimately though this'd be a good change and would simplify things while also allowing uniform access of information to operators
.
.
.
serde_cbor is unmaintained
Details | |
---|---|
Status | unmaintained |
Package | serde_cbor |
Version | 0.11.2 |
URL | https://github.com/pyfisch/cbor |
Date | 2021-08-15 |
The serde_cbor
crate is unmaintained. The author has archived the github repository.
Alternatives proposed by the author:
See advisory page for additional details.
We currently have operators that either take owned values or take referenced ones, but both just defer to an inner operator which explicitly handles reference and owned inputs separately. Generated code could take advantage of being able to handle both cases differently, such as avoiding allocations or whatever which could be very useful
Add variants of operators that allow providing a closure to handle reference values and a closure to handle owned values
No response
No response
Similar to BatchReader
, the TraceReader
API exposes an iterator over all elements in the trace. The two APIs can be reconciled with a bit of work. This way, any operator that works on batches would also work on traces, e.g., Join
currently takes a batch as input. When using Join
as part of join_incremental
computation, one of the input batches is an integral of the input stream. With this change we will be able to switch to integrating streams of (indexed) Z-sets into a trace instead of a batch and everything would still work.
No response
No response
This is actually wrong, the aggregate function should be invoked even when the argument set is empty.
For example, in SQL COUNT(*) would return 0 for an empty table, and not an empty collection.
We need to add a .count()
operator that counts the number of values for any given key, e.g. (K, V).count() -> (K, isize)
The Operator::summary
method returns printable operator metadata as a string. We would like to use a more structured representation, but since metadata is operator-specific, we cannot use a pre-defined structure here. A dynamic representation (e.g., JSON) may be the sweet spot.
This will also allow us to eliminate the dependency on textwrap.
No response
No response
Instead of
circuit.add_unary_operator(Map::new(|x| f(x)), stream)
one should be able to write:
stream.map(|x| f(x))
Async is not a good name, as these operators are still synchronous as in "synchronous circuits".
Some proposed alternatives (but I think we can do better): "interface", "extern", "EnvironmentReader", "EventDriven", "notifier".
Ran into the following error (CyclicCircuit
) trying to build a circuit; I'm fairly certain this should work.
Minimal reproducible example:
use dbsp::Circuit;
fn main() {
let (_, _) = Circuit::build(|circuit| {
let (action_stream, action_handle) = circuit.add_input_stream();
let _ = circuit.iterate(|child| {
let _ = action_stream
.apply_owned(|action: i32| action)
.delta0(child);
Ok((
move || Ok(true),
()
))
}).unwrap();
action_handle
}).unwrap();
}
Errors with the following on the current git (dbsp = { git = "https://github.com/vmware/database-stream-processor" }
):
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: CyclicCircuit { node_id: GlobalNodeId([NodeId(1)]) }', crates/pact-server/src/main.rs:24:8
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
The integrate_nested
method is only used in tests and should be removed along with Z1Nested
.
No response
No response
Linear aggregates can be computed incrementally. Converting the output of a linear aggregate to a Z-set in unfortunately no longer linear. Question is whether there's anything useful we can do with regular sets or perhaps with Z-sets with non-standard weights (since the output of a linear aggregate can be modeled as a Z-set whose weight type is the value of the aggregate). If so, we can expose the purely linear aggregation API; otherwise, it will have to bundled with Z-set conversion.
No response
No response
The current API for assembling operators into circuits has poor ergonomics.
This is partially because it is intentionally low-level, designed to support maximal
flexibility as opposed to ease of use, and partially because it was built in
a bottom-up fashion, with little consideration for user experience.
My goal is to cleanup the design where possible and also hide some low-level details
behind a less flexible but more user-friendly higher-level API.
Below is a summary of problems along with some ideas for addressing them, but first
I mention the aspects of the current design that work well and that I'd like to
preserve going forward.
The functional style of instantiating operators, similar to DD, LINQ, and many other
systems is nice compared to manually instantiating operators and connecting their
outputs (the stream.map().join().distinct()
stuff).
The operator caching feature allows creating high-level operators without introducing
a performance hit, e.g., stream.distinct_trace()
internally calls stream.trace()
.
If another operator in the circuit calls stream.trace()
, it will get a reference to
the same trace rather than creating a new one.
API requires type annotations. Most operators are generic over return type, e.g.,
happy to return any type that implements trait ZSet
. The goal is to support multiple
ZSet implementations (e.g., in-memory and disk-backed), but the downside is that the
programmer often needs to explicitly specify the return type using type annotations.
Operator naming. Operator names are currently a complete mess. Names like
join_trace
for example make zero sense to the user. Ignoring the current lame naming
scheme, a sensible naming convention is not easy to some up with because DBSP
supports multiple flavors of the same operator. Linear operators (map, filter, index,
etc.) are easy, as the same implementation works in incremental and non-incremental cases
and for arbitrary nesting depth. join
, aggregate
, distinct
, and other non-linear
operators are more tricky. First, they have incremental and non-incremental versions.
We typically use the former in practice, but there are situations where non-incremental
operators are useful. Whether we use an _incremental
suffix for incremental operators
of _non_incremental
(or something to that effect) for non-incremental operators, this
will be easy to get wrong since the type system currently doesn't distinguish between
streams of changes and streams that carry complete collections. An additional complication
is that some operators have incremental implementations specialized for different nesting
depths. The type system does not currently enforce the use of the correct implementation
for the given scope.
Trace-based API vs generic API. Operators like integrate()
, integrate_nested()
work for any values that form a group, but are inefficient when working with Z-sets and
indexes Z-sets, which is what we care about 99% of the time. Users should instead
call methods of the trace-based API, e.g., integrate_trace
. This is another potential
source of confusion.
Export API. The API to export a stream from a nested circuit is verbose and easy to
get wrong. First, the user must call stream.integrate_trace()
inside the circuit to
sum up all deltas computed across all iterations of the circuit into a trace. In the
parent circuit, the user must call consolidate
to consolidate the trace into a single
batch.
Recursion API. Today, a recursive relation is created by manually instantiating a
DelayedFeedback
operator and closing the loop using its .connect()
method. In addition,
the user must explicitly apply distinct
to the recursive relation to ensure convergence.
This feels verbose and low-level.
Documentation. Once the above issues have been addressed, the resulting API
needs nice top down documentation with examples.
This is useful for computing things like counts and averages.
UnsafeCell
was a premature optimization. Its use is not well encapsulated, and it likely enables unsafe behavior.
No response
No response
No response
No response
DBSP should be to handle incremental joins that do not use just field equality efficiently - they are still bilinear operators.
The current API based on indexes does not allow expressing non-equi joins.
As a side note, that something that DDlog probably can't implement efficiently.
The design for traces that we inherited from DD can stop an in-progress merge of two batches if it runs out of fuel halfway through the merge. Such an in-progress merge can use 2x memory since it keeps the original batches around until the merge completes. I wonder how important the optimization is and whether we can make things simpler and more memory-efficient by always running each merge to completion.
No response
No response
HashMap doesn't allow creating an entry from a reference to a key (this feature is currently unstable and looks like it won't be stabilized any time soon). HashMap is just a wrapper around hashbrown, which does support this. I suggest we use hashbrown directly to get better performance.
@mbudiu-vmw , @Kixiron , what do you think?
No response
No response
We would like to be able to add untimed batches (Batch<Time=()>
) to a timed trace. Currently this requires re-constructing the batch by adding the same timestamp value to each tuple. Instead we could implement a new batch type that stores a single copy of the timestamp and have traces access such batches. Only when two batches with different time stamps are merged a regular batch is produced. This requires a trace implementation built on top at least two different batch types.
No response
No response
Add more doc examples when we have some interesting relational operators.
No response
No response
We discuss strategies for parallelizing the execution of DBSP circuits.
NOTE: This is orthogonal to the question of speeding up individual operators
using SIMD instructions or GPUs.
The high-level goal is to shard processing across multiple worker threads while
minimizing bottlenecks due to thread communication and synchronization. As a
starting point for the design, the DD architecture where each thread runs its
own copy of the circuit seems suitable. These copies happen to have identical
topologies, i.e., contain the same operators and subcircuits connected in the
same way (not strictly required, but I have a hard time imagining why we would
want something else).
Fragments of the circuit may execute locally without any inter-thread
communication, e.g., assuming input tables have been sharded, a sequence of
linear operators, including group_by
and linear aggregates, can execute
locally with results from multiple workers assembled at the end of the pipeline.
Other operators, most noticeably joins, may require communication, but even that
can sometimes be avoided, e.g., R1(x,y), R2(y,z)
can be evaluated locally
assuming R1
and R2
are sharded so that records that share the same value of
y
are produced by the same worker. With DDlog we've seen workloads that in
theory could have been partitioned based on some attribute with the bulk of
processing occurring without cross-worker communication. But of course we
couldn't take advantage of this when running on top of DD.
We therefore do not want to automatically shard all operators or all streams.
Instead, sharding decisions should be left to the compiler or programmer. A
modular way to achieve this is to encapsulate sharding inside special
Exchange operators. Such an operator reads locally computed values from its
input stream, distributes them across its peers in other workers based on a
sharding function, and outputs the set of values received from peers:
Exchange
┌───────┐
┌───────┐ │ │
│ op 1 ├──────────────┐ │ │
└───────┘ ▲ │ │
┌───────┐ │ │ ┌───────┐
│ op 3 ├─────────┼─┬──┬──┼─────────►│ op 4 │
└───────┘ │ │ │ │ └───────┘
┌───────┐ ▲ │ │ │ │
│ op 2 ├──────────────┘ │ │ │ │
└───────┘ │ │ │ │
WORKER 1 │ │ │ │
─────────────────────────────────────────────────┼─┼──┼──┼──────────────────────────
WORKER 2 │ │ │ │
┌───────┐ │ │ │ │
│ op 1 ├──────────────┐ │ │ │ │
└───────┘ ▼ │ │ │ │
┌───────┐ │ │ │ │ ┌───────┐
│ op 3 ├─────────┼─┴──┴──┼─────────►│ op 4 │
└───────┘ │ │ └───────┘
┌───────┐ ▲ │ │
│ op 2 ├──────────────┘ │ │
└───────┘ └───────┘
An interesting question is: how are Exchange operators scheduled? The simplest
approach is to stick to static scheduling. When an Exchange operator is
evaluated, it distributes its inputs across peer mailboxes and then blocks
waiting for all peers to put values in its incoming mailboxes. This means that
the whole pipeline will run at the speed of the slowest worker.
As an optimization, we can implement communication-aware static scheduling: (1)
split Exchange operators into the input half that writes data to peer mailboxes
and output half that collects data from peers. We compute a static schedule
that tries to evaluate all inputs halves as early as possible and all output
halves as late as possible to give stragglers time to catch up and reduce
waiting times.
Finding an optimal static schedule can be hard or impossible. Therefore, the
next option is to implement a dynamic scheduler that schedules the output halves
of Exchange operators as they become ready. This will require a new mechanism
for an operator to inform the scheduler about its status. Even with this
design, synchronization will be a bottleneck for some workloads.
We therefore consider a more radical solution that takes advantage of
incremental computation to reduce data dependencies between workers. Rather
than waiting for inputs from all peers to become available, Exchange operators
can be placed in an iterative scope. At each nested clock cycle, an Exchange
operator yields new inputs received from its peers. The circuit processes these
inputs before reading more inputs at the next iteration. Iteration continues
until all data has been sent and received.
We don't need to choose one of the above options. There's enough modularity in
the design of the framework to support all or some of them. We can start with
implementing a simple (non-iterative) exchange operator and using it with a
static scheduler. As the next step, we add dynamic scheduling and see how much
difference it makes in benchmarks. We can experiment with iterative Exchange
operators if we still see bottlenecks.
Currently pretty much every operator expects a sorted & consolidated collection and produces a sorted & consolidated collection, but this work is very often redundant. Ephemeral operators like .filter()
or .map()
don't care about the order or consolidated-ness of their inputs or outputs, so there's no need to always marshal their inputs/outputs into OrdZSet
and the like, they can consume arbitrary inputs and produce unordered batches of values
Operator names are currently a mess, for example join_trace
is really an incremental join on nested streams. Same with distinct_trace
. We need some kind of naming system that encodes what operators do instead of how they are implemented.
No response
No response
Currently we have two very strict issue templates that not all issues fall under, we should add one that just allows freedom to write whatever users want to
.
.
.
For example, we have join and join_incremental, distinct and distinct_incremental, but aggregate and stream_aggregate.
A lot of our sorting and more general consolidation can be done faster via SIMD. This'd be a huge win if we got it working as sorting currently takes a pretty significant part of our runtime
No response
No response
See for instance the q4
query in the nexmark benchmark.
We are only using a couple of timely traits: PartialOrder
and Antichain
. Copy them to DBSP to completely eliminate the dependency.
No response
No response
LDBC Benchmarks are a great source of tests that really hone in on the things we need to benchmark:
These are a really good mix of both static and streaming datasets, which is exactly what we need.
Prior art from Frank McSherry w/ DDFlow
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.