Comments (28)
I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?
I think that could be a really elegant solution!
Currently we landed on using two Broadway pipelines, which is perfectly fine, but if we could essentially compose them together without the need publish back out to a remote queue, that would be pretty fantastic.
from broadway.
just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach
Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.
from broadway.
@bdubaut in this case, you don’t need multiple processors. Processors shouldn’t be used to route business logic, for business logic, you can have code in handle_message that matches on the message and dispatches to the appropriate place.
from broadway.
I'm in a CQRS/ES application that's consuming events from a RabbitMQ message bus as producer. What I'd like to do is to define different processors so that I can handle the messages differently. I have a web app funneling webhook events through a rabbitmq instance, that is my producer. I essentially would like to do something like this:
defmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(MyBroadway,
name: MyBroadwayExample,
producers: [
default: [
module: {MyProducerModule, []},
stages: 1
]
],
processors: [
webhook_source: [
module: MyWebookSourceModule
stages: 50
],
other_webhook_source: [
module: MyOtherWebookSourceModule
stages: 50
]
]
)
end
end
This would allow me to route messages to the correct handlers and clean up my broadway declaration file.
I'm happy to help with this, wether it's doc or implementation.
from broadway.
from broadway.
You would disable acking on the first and then include the relevant metadata to ack on the second. If I remember correctly you can update the ack configuration on the fly.
But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.
from broadway.
Copied from #36
@mgwidmann said:
The broadway sqs producer is very difficult to tune without this feature. If I cannot separate download concurrency from processing concurrency (by making a separate stage for downloaders and a separate stage for processors) then I cannot scale them independently.
The best workaround currently is to have the processor spawn a few tasks to do the download which I'm currently testing with Flow but that leaves the processor waiting instead of processing an already ready message (say from another downloader).
@josevalim said:
Well, since we are talking about it here, here are my $.02 cents: I am a bit skeptical though that this would make a difference. The VM is really good at multiplexing jobs, so having 2000 stages doing downloads and then processing it should totally be fine. I think sending it to another series of stages won't buy you anything, perhaps only make things slower.
from broadway.
@josevalim I'm having trouble seeing how we would partition to the right set of processors. I think we can likely avoid the multiple processors, at least as far as the use cases I've use Broadway in go :)
from broadway.
To me multiple processors would run as steps (one after the other).
@josevalim That's a cool idea, in that case I feel like the processors
key is a bit misleading, because I totally understood it as "Here are my separate processors that handle the events in different ways, and you would just need to pattern match on the event to route it to the right module."
Unless I'm the only one that understood it this way, I wonder how to make this clearer either in the doc or in the broadway options themselves 🤔
from broadway.
from broadway.
@josevalim I'd like to make an attempt at this, if it's still viable.
from broadway.
Hi @mcrumm, thanks for the ping but we are still waiting for valid use cases. Do you have any? :D
from broadway.
Hi @josevalim.
I would say that use case could be following:
First layer processor handler is a GenStage producer_consumer.
So, for each event from producer it generates several events for your the next processor layer, which could be a producer_consumer or a consumer.
We use such kind of GenStage pipeline to fetch data from Snapchat Marketing API (https://developers.snapchat.com/api/docs/#introduction).
So, producer fetches entities (e.g., campaigns) and pushes them as events to the next layer.
For each campaign producer_consumer from the seceond layer fetches campaign stats and pushes then as events for the next layer.
Consumer from the last layer adds some additional info from our internal DB for each stat entry and pushes it to some kafka topic or saves it DB, whatever.
Here (http://big-elephants.com/2019-01/facebook-genstage/) is the detailed description of this approach.
It is a little bit outdatted, but the idea is the same.
from broadway.
Thanks for sharing @van-mronov! As I mentioned in my replies above, I am still unconvinced the logic you described would really benefit from multiple steps.
For example, in what you described, I would have a single producer, N consumers, and have each of the consumers do the work from beginning to end.
So a consumer would do:
id
|> fetch_campaigns
|> for_each_campaign_get_stats
|> attach_db_info_to_stats
Without a need into break those into a bunch of processors.
The reason is because our CPU concurrency is always bounded by the machine cores so as long as you have one stage with N processors, where N is the number of cores, you will maximize CPU usage.
But then you can say "but José, they have to do I/O too", then you can easily start 100 consumers instead, and everything will still work one because the VM is really good at resources allocation and also performs work stealing.
Even if somehow you are already not happy with that, note that you have been doing 1 producer - N consumers so far, but you can also flip it to M producers - N consumers, and that already adds a lot of flexibility to split the load.
So in my mind, the goal of having multiple processors is to effectively maximize the machine resources and I haven't yet heard of a problem where you can't do that with Broadway topologies as is. In fact, I am worried adding this feature will only push people towards less optimal designs, because they will use multiple processors as logical steps, while they are everything BUT that.
from broadway.
Thank you @josevalim. I see what you mean. I believe we started to use several processors layers since for_each_campaign_get_stats
could cause several independent http requests. So idea was to make each of them them in different processes. But, perhaps, we can do this with Task.Supervisor
or Flow
and Broadway
. I need to think about it.
from broadway.
Thanks for the detailed explanation -- that cleared up a number of things for me.
A use case, as it was described to me, wouldn't actually be served by simply adding more processor stages, either. However, I thought it was interesting, so to close the loop on why I resurrected this issue I'll share it here.
So right now, we receive a message in the processor and we can fan-in to a batcher. What if we could fan-out of batches, into a subsequent processor?
So, for instance, I receive messages one at a time in handle_message/3
, and then I want to batch them for some bulk operation. But after the bulk operation in handle_batch/4
, I need to perform some intensive operation on each message individually before acknowledging it.
Processor --> Batcher --> Processor --> Batcher
from broadway.
@mcrumm yes, I can see this case being a necessity. I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?
from broadway.
Yes please! :)
from broadway.
We have all seen similar pipelines for streaming frameworks like Flink, where you have multiple stages partitioned by key for aggregating or joining messages on stream. In that case it has value because processes are distributed and handle their own big state.
For a single node case, I agree with @josevalim that could be managed with one stage for processes.
from broadway.
Hello José, thanks for your wonderful work :)
Here is a use case I came across that might benefit from multiple processors:
[producer]
get youtube URL from SQS[processor]
find interesting moments (highlights) in it by analyzing the audio track (ML python script)[processor]
use ffmpeg to create small clips around these highlights (another python script running ffmpeg)[batcher]
upload clips to S3
The analysis part is mostly IO and brief single core bursts so we want to have e.g. num_analysis = num_cores
parallel processes.
The ffmpeg part reencodes clips to 720p, retrieving relevant parts directly from the source HTTP video; ffmpeg uses all available cores when doing this so we must limit the number of ffmpeg processes to something reasonable (e.g. num_ffmpeg = num_cores / 4
).
We did not want to limit ffmpeg max threads, because we want ffmpeg to use all cores when there's only 1 encoding process in flight.
So combining analysis + ffmpeg in a single processor did not work well, we either had too much ffmpeg or not enough analysis parallel processes.
Note 1: the Erlang VM wonders don't apply here, because the CPU intensive tasks happen in external processes.
Note 2: processors' one_for_all
strategy was also an issue, we don't want to kill all these long running processes when one of them fails.
So I started rewriting the pipeline with GenStage
, and got a good measure of how Broadway
facilitated my life before. I think multiple processors (and customisable supervisor strategies) would be a really great addition :)
from broadway.
Hi @flupke!
Note 2: processors' one_for_all strategy was also an issue, we don't want to kill all these long running processes when one of them fails.
This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.
Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).
from broadway.
This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.
I see, thank you for the clarification!
Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).
I couldn't tell how that would work, so I did an experiment. The pool implementation seems to be as performant as the 2 stages version (maybe even better on latency, but I have doubts on my code). So this is one more point against multiple processors, and very good news for me since I can stick to Broadway.
Thank you very much!!!
from broadway.
Hello!
(...) I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?
@josevalim, my question is not directly related to mcrumm's case, I am just wondering - when we connect multiple pipelines, how acknowledgement semantics works?
Would it be a necessity to do something like that, for example?
RabbitMQ -> Producer -> Processor -> Batcher -> | pipeline boundary | -> RabbitMQ -> Producer -> Processor -> Batcher
Or do you see a sound way of ACKing from the next pipeline to previous pipeline Batcher? 🤔
from broadway.
(...) But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.
Thank you for the answer! By no means I am entering polemics here, just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach :)
Regarding business logic, I have similar thoughts: Processors which do different classes of transformation, next dependent on result of previous, make sense to me. It's a different story, but for example similar situation we can have in distributed systems based on microservices (or services in general), where every service is doing something else, and request travels between them until completion.
Please take what I write with a bucket of salt, I don't understand Broadway well enough :)
from broadway.
just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach
Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.
Thank you for your time, appreciated! I will try to continue with the path/approach you outlined.
from broadway.
I first want to say thank you to everyone involved in Broadway, it's an amazing library! I was considering it and GenStage for a use case I have where I want to have batching, rate-limiting, and eventually back-pressure, so it seemed like a better tool than just GenStage. But I also wanted to be able to implement a topology where one event is processed in parallel (or sequentially) by several different processors where they don't have anything to do with each other. Something like:
[A - producer] Produce event -> [B - consumer] Send some requests [C - consumer] Log event [D - consumer] Push some notifications
Up until today I didn't find in the documentation any mention that Broadway can have only one processor, and the documentation begins with:
Broadway is a concurrent, multi-stage tool for building data ingestion and data processing pipelines.
But then below in the docs it says:
Note that Broadway does not accept multiple producers neither multiple processors. But we choose to keep in a list for simplicity and future proof.
I imagine I can send the events to several different pipelines, it would also allow for more fine-grained producer concurrency control. I also wonder if there's a way to subscribe custom GenStage consumers. Or perhaps use different batches with duplicate data and in a way let the different batchers be the steps B, C, and D? But I also wanted to ask - if that's the case, what is meant by multi-stage
in the first sentence from the docs?
from broadway.
@NicolayD per the discussion above, the general observation is that breaking it into steps will often make things slower than faster. Your code should rather be:
Producer: A
Processor: B -> C -> D
Or perhaps D could happen in a batch processor.
This is a common misconception where folks want to make design steps into logical steps, but moving data around is expensive and you should only do it when necessary.
But I also wanted to ask - if that's the case, what is meant by multi-stage in the first sentence from the docs?
Each layer (producer, processor, batcher) have multiple internal stages. But even if you discount that, the skeleton of producer -> processor -> batcher -> batch processor is already multi.
from broadway.
I see, thank you very much! It does make sense to be in one step.
from broadway.
Related Issues (20)
- Possibility to update Pipeline :context ? HOT 1
- Allow use of nimble_options 0.4.0 HOT 2
- [Question] how am i able to update a message in handle_failed and send to another batcher? HOT 1
- How to stop a Broadway Kafka pipeline? HOT 1
- Make producer module a keyword list to ease configuration management? HOT 6
- Broadway.update_rate_limit doesn't reset the counter/interval right away HOT 3
- NoopAcknowledger fails with ack key being set HOT 7
- Disable automatic call to handle_batch/4 HOT 2
- [Question] Creating a Broadway Message struct for testing?
- Telemetry distinguish between Producer metrics HOT 2
- Expected Behavior on Startup? HOT 4
- [docs] The `Broadway.test_batch` example doesn't work with Broadway 1.0.3 HOT 3
- Dialyzer error on ack_immediately/1 HOT 3
- Broadway v1.0.4 Broadway.NoopAcknowledger returns NoopAcknowledger instead of Broadway.NoopAcknowledger HOT 1
- Add `terminate/3` callback HOT 11
- Oban producer HOT 3
- Allow support for Nimble Options 1.0 HOT 5
- Request for MQTT support in Elixir Broadway HOT 3
- Issues using Broadway with DynamicSupervisor
- `handle_message` timeout HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from broadway.