Coder Social home page Coder Social logo

Support multiple processors about broadway HOT 28 OPEN

dashbitco avatar dashbitco commented on July 17, 2024
Support multiple processors

from broadway.

Comments (28)

mcrumm avatar mcrumm commented on July 17, 2024 8

I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

I think that could be a really elegant solution!

Currently we landed on using two Broadway pipelines, which is perfectly fine, but if we could essentially compose them together without the need publish back out to a remote queue, that would be pretty fantastic.

from broadway.

josevalim avatar josevalim commented on July 17, 2024 3

just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach

Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.

from broadway.

josevalim avatar josevalim commented on July 17, 2024 2

@bdubaut in this case, you don’t need multiple processors. Processors shouldn’t be used to route business logic, for business logic, you can have code in handle_message that matches on the message and dispatches to the appropriate place.

from broadway.

bdubaut avatar bdubaut commented on July 17, 2024 1

I'm in a CQRS/ES application that's consuming events from a RabbitMQ message bus as producer. What I'd like to do is to define different processors so that I can handle the messages differently. I have a web app funneling webhook events through a rabbitmq instance, that is my producer. I essentially would like to do something like this:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(MyBroadway,
      name: MyBroadwayExample,
      producers: [
        default: [
          module: {MyProducerModule, []},
          stages: 1
        ]
      ],
      processors: [
        webhook_source: [
          module: MyWebookSourceModule
          stages: 50
        ],
        other_webhook_source: [
          module: MyOtherWebookSourceModule
          stages: 50
        ]
      ]
    )
  end
end

This would allow me to route messages to the correct handlers and clean up my broadway declaration file.

I'm happy to help with this, wether it's doc or implementation.

from broadway.

josevalim avatar josevalim commented on July 17, 2024 1

from broadway.

josevalim avatar josevalim commented on July 17, 2024 1

You would disable acking on the first and then include the relevant metadata to ack on the second. If I remember correctly you can update the ack configuration on the fly.

But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.

from broadway.

mgwidmann avatar mgwidmann commented on July 17, 2024

Copied from #36
@mgwidmann said:

The broadway sqs producer is very difficult to tune without this feature. If I cannot separate download concurrency from processing concurrency (by making a separate stage for downloaders and a separate stage for processors) then I cannot scale them independently.

The best workaround currently is to have the processor spawn a few tasks to do the download which I'm currently testing with Flow but that leaves the processor waiting instead of processing an already ready message (say from another downloader).

@josevalim said:

Well, since we are talking about it here, here are my $.02 cents: I am a bit skeptical though that this would make a difference. The VM is really good at multiplexing jobs, so having 2000 stages doing downloads and then processing it should totally be fine. I think sending it to another series of stages won't buy you anything, perhaps only make things slower.

from broadway.

whatyouhide avatar whatyouhide commented on July 17, 2024

@josevalim I'm having trouble seeing how we would partition to the right set of processors. I think we can likely avoid the multiple processors, at least as far as the use cases I've use Broadway in go :)

from broadway.

bdubaut avatar bdubaut commented on July 17, 2024

To me multiple processors would run as steps (one after the other).

@josevalim That's a cool idea, in that case I feel like the processors key is a bit misleading, because I totally understood it as "Here are my separate processors that handle the events in different ways, and you would just need to pattern match on the event to route it to the right module."

Unless I'm the only one that understood it this way, I wonder how to make this clearer either in the doc or in the broadway options themselves 🤔

from broadway.

josevalim avatar josevalim commented on July 17, 2024

from broadway.

mcrumm avatar mcrumm commented on July 17, 2024

@josevalim I'd like to make an attempt at this, if it's still viable.

from broadway.

josevalim avatar josevalim commented on July 17, 2024

Hi @mcrumm, thanks for the ping but we are still waiting for valid use cases. Do you have any? :D

from broadway.

van-mronov avatar van-mronov commented on July 17, 2024

Hi @josevalim.

I would say that use case could be following:
First layer processor handler is a GenStage producer_consumer.
So, for each event from producer it generates several events for your the next processor layer, which could be a producer_consumer or a consumer.

We use such kind of GenStage pipeline to fetch data from Snapchat Marketing API (https://developers.snapchat.com/api/docs/#introduction).
So, producer fetches entities (e.g., campaigns) and pushes them as events to the next layer.
For each campaign producer_consumer from the seceond layer fetches campaign stats and pushes then as events for the next layer.
Consumer from the last layer adds some additional info from our internal DB for each stat entry and pushes it to some kafka topic or saves it DB, whatever.

Here (http://big-elephants.com/2019-01/facebook-genstage/) is the detailed description of this approach.
It is a little bit outdatted, but the idea is the same.

from broadway.

josevalim avatar josevalim commented on July 17, 2024

Thanks for sharing @van-mronov! As I mentioned in my replies above, I am still unconvinced the logic you described would really benefit from multiple steps.

For example, in what you described, I would have a single producer, N consumers, and have each of the consumers do the work from beginning to end.

So a consumer would do:

id
|> fetch_campaigns
|> for_each_campaign_get_stats
|> attach_db_info_to_stats

Without a need into break those into a bunch of processors.

The reason is because our CPU concurrency is always bounded by the machine cores so as long as you have one stage with N processors, where N is the number of cores, you will maximize CPU usage.

But then you can say "but José, they have to do I/O too", then you can easily start 100 consumers instead, and everything will still work one because the VM is really good at resources allocation and also performs work stealing.

Even if somehow you are already not happy with that, note that you have been doing 1 producer - N consumers so far, but you can also flip it to M producers - N consumers, and that already adds a lot of flexibility to split the load.

So in my mind, the goal of having multiple processors is to effectively maximize the machine resources and I haven't yet heard of a problem where you can't do that with Broadway topologies as is. In fact, I am worried adding this feature will only push people towards less optimal designs, because they will use multiple processors as logical steps, while they are everything BUT that.

from broadway.

van-mronov avatar van-mronov commented on July 17, 2024

Thank you @josevalim. I see what you mean. I believe we started to use several processors layers since for_each_campaign_get_stats could cause several independent http requests. So idea was to make each of them them in different processes. But, perhaps, we can do this with Task.Supervisor or Flow and Broadway. I need to think about it.

from broadway.

mcrumm avatar mcrumm commented on July 17, 2024

Thanks for the detailed explanation -- that cleared up a number of things for me.

A use case, as it was described to me, wouldn't actually be served by simply adding more processor stages, either. However, I thought it was interesting, so to close the loop on why I resurrected this issue I'll share it here.

So right now, we receive a message in the processor and we can fan-in to a batcher. What if we could fan-out of batches, into a subsequent processor?

So, for instance, I receive messages one at a time in handle_message/3, and then I want to batch them for some bulk operation. But after the bulk operation in handle_batch/4, I need to perform some intensive operation on each message individually before acknowledging it.

Processor --> Batcher --> Processor --> Batcher

from broadway.

josevalim avatar josevalim commented on July 17, 2024

@mcrumm yes, I can see this case being a necessity. I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

from broadway.

zachdaniel avatar zachdaniel commented on July 17, 2024

Yes please! :)

from broadway.

matreyes avatar matreyes commented on July 17, 2024

We have all seen similar pipelines for streaming frameworks like Flink, where you have multiple stages partitioned by key for aggregating or joining messages on stream. In that case it has value because processes are distributed and handle their own big state.
For a single node case, I agree with @josevalim that could be managed with one stage for processes.

from broadway.

flupke avatar flupke commented on July 17, 2024

Hello José, thanks for your wonderful work :)

Here is a use case I came across that might benefit from multiple processors:

  • [producer] get youtube URL from SQS
  • [processor] find interesting moments (highlights) in it by analyzing the audio track (ML python script)
  • [processor] use ffmpeg to create small clips around these highlights (another python script running ffmpeg)
  • [batcher] upload clips to S3

The analysis part is mostly IO and brief single core bursts so we want to have e.g. num_analysis = num_cores parallel processes.

The ffmpeg part reencodes clips to 720p, retrieving relevant parts directly from the source HTTP video; ffmpeg uses all available cores when doing this so we must limit the number of ffmpeg processes to something reasonable (e.g. num_ffmpeg = num_cores / 4).

We did not want to limit ffmpeg max threads, because we want ffmpeg to use all cores when there's only 1 encoding process in flight.

So combining analysis + ffmpeg in a single processor did not work well, we either had too much ffmpeg or not enough analysis parallel processes.

Note 1: the Erlang VM wonders don't apply here, because the CPU intensive tasks happen in external processes.

Note 2: processors' one_for_all strategy was also an issue, we don't want to kill all these long running processes when one of them fails.

So I started rewriting the pipeline with GenStage, and got a good measure of how Broadway facilitated my life before. I think multiple processors (and customisable supervisor strategies) would be a really great addition :)

from broadway.

josevalim avatar josevalim commented on July 17, 2024

Hi @flupke!

Note 2: processors' one_for_all strategy was also an issue, we don't want to kill all these long running processes when one of them fails.

This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.

Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).

from broadway.

flupke avatar flupke commented on July 17, 2024

This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.

I see, thank you for the clarification!

Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).

I couldn't tell how that would work, so I did an experiment. The pool implementation seems to be as performant as the 2 stages version (maybe even better on latency, but I have doubts on my code). So this is one more point against multiple processors, and very good news for me since I can stick to Broadway.

Thank you very much!!!

from broadway.

filipmnowak avatar filipmnowak commented on July 17, 2024

Hello!

(...) I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

@josevalim, my question is not directly related to mcrumm's case, I am just wondering - when we connect multiple pipelines, how acknowledgement semantics works?

Would it be a necessity to do something like that, for example?

RabbitMQ -> Producer -> Processor -> Batcher -> | pipeline boundary | -> RabbitMQ -> Producer -> Processor -> Batcher

Or do you see a sound way of ACKing from the next pipeline to previous pipeline Batcher? 🤔

from broadway.

filipmnowak avatar filipmnowak commented on July 17, 2024

(...) But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.

Thank you for the answer! By no means I am entering polemics here, just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach :)
Regarding business logic, I have similar thoughts: Processors which do different classes of transformation, next dependent on result of previous, make sense to me. It's a different story, but for example similar situation we can have in distributed systems based on microservices (or services in general), where every service is doing something else, and request travels between them until completion.
Please take what I write with a bucket of salt, I don't understand Broadway well enough :)

from broadway.

filipmnowak avatar filipmnowak commented on July 17, 2024

just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach

Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.

Thank you for your time, appreciated! I will try to continue with the path/approach you outlined.

from broadway.

NicolayD avatar NicolayD commented on July 17, 2024

I first want to say thank you to everyone involved in Broadway, it's an amazing library! I was considering it and GenStage for a use case I have where I want to have batching, rate-limiting, and eventually back-pressure, so it seemed like a better tool than just GenStage. But I also wanted to be able to implement a topology where one event is processed in parallel (or sequentially) by several different processors where they don't have anything to do with each other. Something like:

[A - producer] Produce event -> [B - consumer] Send some requests [C - consumer] Log event [D - consumer] Push some notifications

Up until today I didn't find in the documentation any mention that Broadway can have only one processor, and the documentation begins with:

Broadway is a concurrent, multi-stage tool for building data ingestion and data processing pipelines.

But then below in the docs it says:

Note that Broadway does not accept multiple producers neither multiple processors. But we choose to keep in a list for simplicity and future proof.

I imagine I can send the events to several different pipelines, it would also allow for more fine-grained producer concurrency control. I also wonder if there's a way to subscribe custom GenStage consumers. Or perhaps use different batches with duplicate data and in a way let the different batchers be the steps B, C, and D? But I also wanted to ask - if that's the case, what is meant by multi-stage in the first sentence from the docs?

from broadway.

josevalim avatar josevalim commented on July 17, 2024

@NicolayD per the discussion above, the general observation is that breaking it into steps will often make things slower than faster. Your code should rather be:

Producer: A
Processor: B -> C -> D

Or perhaps D could happen in a batch processor.

This is a common misconception where folks want to make design steps into logical steps, but moving data around is expensive and you should only do it when necessary.

But I also wanted to ask - if that's the case, what is meant by multi-stage in the first sentence from the docs?

Each layer (producer, processor, batcher) have multiple internal stages. But even if you discount that, the skeleton of producer -> processor -> batcher -> batch processor is already multi.

from broadway.

NicolayD avatar NicolayD commented on July 17, 2024

I see, thank you very much! It does make sense to be in one step.

from broadway.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.