hydro-project / hydroflow Goto Github PK
View Code? Open in Web Editor NEWHydro's low-level dataflow runtime
Home Page: https://hydro.run/docs/hydroflow/
License: Apache License 2.0
Hydro's low-level dataflow runtime
Home Page: https://hydro.run/docs/hydroflow/
License: Apache License 2.0
Broken by/since fc48b71
thread 'main' panicked at 'called Result::unwrap()
on an Err
value: Custom { kind: Uncategorized, error: "failed to lookup address information: nodename nor servname provided, or not known" }', covid_tracing_dist/src/tracker.rs:117:10
note: run with RUST_BACKTRACE=1
environment variable to display a backtrace
To repro, the below test will echo what you type as given. But if you uncomment the line with fold
it will not.
pub async fn test_strata_with_stdin() {
let reader = tokio::io::BufReader::new(tokio::io::stdin());
let stdin_lines =
tokio_stream::wrappers::LinesStream::new(tokio::io::AsyncBufReadExt::lines(reader));
let mut hf = hydroflow_syntax! {
recv_iter(vec![1,2,3])
// -> fold(0, |a,b| a + 1)
-> for_each(|x| println!("There are {} items", x));
recv_stream(stdin_lines)
-> map(|l: Result<std::string::String, std::io::Error>| l.unwrap())
-> for_each(|s| println!("Echo: {:?}", s))
};
tokio::select! {
_ = hf.run_async() => (),
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => (),
};
}
An interesting decision to make is whether we represent it as an infinite relation (pure, but hard) or a custom extension to the language (not datalog, but easy to implement). There are two main cases where arithmetic is used:
So I don't think there's a big immediate need for the infinite relations since the language extensions can support the above too without too much effort.
@davidchuyaya thoughts, do the above two cases cover your uses?
Rather than have the state API be aware of ticks and do clobbering, provide a simple map-esque wrapper class where the keys are ticks/epochs. Each tick advancement resets the value.
One way that might be cool to test scheduling code like this might be to introduce some kind of logging that logs whenever an operator is scheduled, and then (via datadriven or some other means) compare or eyeball that to what is expected. In the past I've found that kind of stuff helpful for testing things that are like, hard to pre-define the expected results of, but also you know an output is correct when you see it (I think this test is fine for now, though).
There's really no benefit to having users pick the indexes for these.
I found it inconvenient and unnecessarily "Rusty" to think about how to pass static config information (e.g. command line options) into flows with the right ownership. I think it would be nice to have a handy Hydroflow API where we register relevant static variables that are accessible read-only in flows.
A fancier version of this would be to have scoping of such things within the flow ... not even sure how to think about that. Global is OK by me for now.
This is an example of typical Rust gotchas that we can shield noobs from so they just focus on writing their pipelines. There are likely many others.
It should be very easy to wire up a YAML or JSON file, a Vec of internal data, a DB iterator.
-> map(|n| (n..=n+1))
vec![0]
is a single-element vec literalHowever, if instead iterators were pull-based
SHOULD SAY "PUSH"Requires generating nested joins, and we may want to investigate doing some lightweight query optimizations.
Example program: detecting triangles:
triangle(a, b, c) :- edge(a, b), edge(b, c), edge(c, a)
are they meaningfully different? can we make it easier to understand? Else document better.
for stdin
and for network input without an extra await
Right now, if you create an operator that gives you an input or output port you don't care about, you still have to attach it to something or else you get an unattached handoff error. It would be nice if there was a better way to do that than by constructing a sink that no-op drains the the handoff.
For example, if we have max() or min(), a preceding handoff would only ever need to store a single max or min element, streaming. But currently we only have VecHandoffs, which store everything. So we can be more efficient. This will probably tie in to using lattice types in handoffs.
Will require the insertion of barriers, which I don't really know how to do in HF.
e.g. to wrap inbound_tcp_vertex_port
and outbound_tcp_vertex_port
A common pattern in Dedalus programs is to persist some relation into the future via an inductive rule:
q(X)@next :- q(X).
Translated naively to Hydroflow today, this would result in draining a buffer only to re-fill it the same way on the next iteration. We probably want some mechanism to allow us to designate a relation as "persisted" and thus not drained at the end of a tick.
This issue is to acknowledge two needs I see:
q(X)@next :- q(X).
and translate them into a form that can be handled more efficiently.This issue will serve as a central place for comments on the surface syntax and its usability.
While this can be achieved with reduce
it would be handy.
A list of some stuff we discussed today while working through exchange.
Having to do fancy type-level list concatenation to write an operator is Not So Much Fun. It definitely gives us a lot of power and safety, but I think the amount that the average user will be exposed to it today is a bit unfortunate, especially because they don't conceptually feel (to me, at least) like an essential part of the type I'm describing when I write a function.
This might just be my inexperience with explicit lifetimes, but there's quite a lot of machinery to go through to get something working. This one I could see getting resolved once we have sufficiently many examples, though.
This should be its own issue for discussion, perhaps.
Can we easily thread metadata through a pipeline here?
We sort of want something like the tokio streams, kind of looking like this:
enum Msg<T> {
PartialStop,
TotalStop,
Data(T)
}
but as it is, the iterators expect Option
s, and its not clear how to pass these through the tree. Another option would be to have a parallel tree that manages the metadata, but that seems kind of hard and difficult to make safe.
E.g. we just added flatten
and null
and they're not documented. How do we encourage this?
Right now there is not anything that reschedules generic source subgraphs. Also check how the context allows scheduling.
Would be nice for debugging to have an easy syntax to mark a hydroflow variable "debug" and have its contents teed to stdio or stderr.
I.e. suppose I have
message_generator = recv_iter([1,2,3]) -> ... foo(...) -> sink_async(..);
I would want:
message_generator_prep = recv_iter([1,2,3]) -> ... foo(...) -> tee();
message_generator_prep[0] -> foreach(|m| println!(message_generator: {"?"}, m);
message_generator = message_generator_prep[1] -> sink_async(..);
An idling Hydroflow instance makes my (and Mingwei's) fans go crazy and take up 100% cpu. We should fix that.
The hydroflow parser chooses left and right sides of the join based on the order in which join inputs appear in the text, not based on the input index. I.e. the two cases below differ only in the order of the lines but only the first parses correctly.
pub fn test_join_order() {
let mut df_good = hydroflow_syntax! {
yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m));
recv_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes;
recv_iter(["a".to_string(),"b".to_string(),"c".to_string()]) -> map(|s| ((), s)) -> [1]yikes;
};
let mut df_bad = hydroflow_syntax! {
yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m));
recv_iter(["a".to_string(),"b".to_string(),"c".to_string()]) -> map(|s| ((), s)) -> [1]yikes;
recv_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes;
};
}
It would be nice if there was a simple semantic for explicitly vs. non-explicitly stratified subgraphs. This one (where they are always at stratum 0) seems fine to me, but another approach that sounds equally reasonable to me is that non-explicitly stratified operators are always eligible to run? Not sure if we have a preference but seems like something we should decide on.
... :- foo(a, a)
implies a filter on the rows coming from foo
. We need to handle this appropriately.
tee_consumer? after_tee?
CanReceive<T>
type. Previously we used this to allow submitting multiple types, but it adds roughness since we have to specify the T
when using the surface API.flush()
on inputs is an easy footgun.flatten()
instead of .flat_map(std::convert::identity)
In Bloom there is a "bootstrap" block which can be used to run code before tick 0. Similarly datalog has fixed EDB code. We have to figure out how to setup/schedule this.
Ad-hoc, can be done with arbitrary rust in fn main()
, but maybe we want to be more principled.
not sure why, seems to be something around reading lines from terminal??
it's annoying to have to map things to ()
keys to achieve cross-join
@davidchuyaya's protocols often include "relations" defined by a boolean expression, which can be joined with to introduce filters.
I'm imagining a syntax like:
... :- ..., { a + b > 3 }
push_to_pull()? pull_to_push()? switch_push_pull()?
Presently, while we are ticking a stratum, we call try_recv_events
after each operator, which means that we can receive events at basically any point, meaning a network event which is not present at the beginning of a stratum could show up in the middle of it.
I think there are three obvious behaviours:
I think (2) and (3) might be indistinguishable semantically, but (1) is different. We currently implement (1), I don't have a strong opinion on what the correct behaviour is here, but my understanding was that the desired behaviour was (3). Probably something we should discuss and figure out.
Is there an expense to the mapping once the compiler has done its magic, relative to tuples that "happen to be" set up right and don't need maps? Would a closure for "key access" on each input help the compiler more than mapping? And/or should we have some fast-path that makes the "relational joins on relational data" go fast?
In the spirit of Bloom's channel
, it would be nice to have a single socket to handle all the streams running into a Hydroflow node. All the Hydroflow programmer should care about is the name and the type of each stream, not the socket associated with it.
push_into()? then()?
maybe write down in english what it does and find a name in that explanation
Right now, we can only generate surface syntax when using the macro graph builder logic, which means the only way to get surface syntax out is from the proc macro writing to stdout. Ideally, we could generate surface syntax just like we generate mermaid.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.