Coder Social home page Coder Social logo

Comments (8)

xxchan avatar xxchan commented on September 26, 2024 1

To conclude, the difference seems not very large. We need to backfill N times for N MVs, and can only share work after steady state.

  • The largest difference is whether the first MV's backfilling is from upstream or itself.
  • But in case A, later MVs might need to drop a lot of data from S, and has larger overhead.

Some further questions:

Why can't Bs catch up with S if they start together?
Because B has some overhead of iterating data from S.

for (i, (_, row)) in chunk.rows().enumerate() {
let split = row.datum_at(split_idx).unwrap().into_utf8();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let backfill_state =
backfill_stage.states.get_mut(split).unwrap();
let vis = backfill_state.handle_upstream_row(offset);
new_vis.set(i, vis);
}

This might also be the reason why in the original issue's figure, fragment 1 is faster than fragment 2.
I think the algorithm can be optimized: at the beginning when backfilling is far from upstream, we don't need to check the upstream's offset, and we are in a "fast chasing" stage.

But even when this is optimized, I think it still cannot catch up. Because if the backfill is fast, then the upstream source also has a lot of work to do. So it has to be backpressured or rate limited.

Is it possible to share work for historical data?
First we have to adopt case A.
(BTW, I think changing this behavior isn't a breaking change, so it's fine to experiment both options)

Then there are several ways to do it

  • Manually rate limit/pause the source first. I don't think this is practical, because it requires the user to have very deep understanding of how the executors work..
  • Adaptive rate limit the source when downstream is backfilling. I think this is hard to implement. At the same time, it's possible that users don't want the other jobs affected when creating a new MV on the source.
  • transactional DDL to create the MVs together

Do we want to share work for historical data?
I'm not sure.

  • Is it common to have a lot of historical data to ingest from Kafka in reality?
  • A more important question might be: is it common to create a lot of MVs on the same source? Why not materialize the source into a table or MV first? At the end of the day, what's the benefit of shared source over materialized source (besides storage)? One possible use case is to route messages from a source (unified topic) to multiple ones (like we do in nexmark benchmark). But in reality, why not do the routing inside Kafka?

Anyway, if such use cases exist, it's of course nice to share work.

from risingwave.

xxchan avatar xxchan commented on September 26, 2024

However, to implement this, we need to somehow let the MV know whether it's the first..

I came up with a simple solution: sleep 1s after the SourceExecutor is resumed. 🤡

Another simple solution: change the poll strategy in the SourceBackfillExecutor. At the beginning, prefer backfill side, then switch to prefer upstream. However, I'm not sure whether the first poll can get data after the kafka reader just get created.

from risingwave.

xxchan avatar xxchan commented on September 26, 2024

@BugenZhao reminded me that even after this problem is solved, some scenarios are still not optimal: e.g., if we create 10 MVs together, we cannot ensure the later MVs can catch up faster (and thus can share work).

from risingwave.

xxchan avatar xxchan commented on September 26, 2024

if we create 10 MVs together, we cannot ensure the later MVs can catch up faster (and thus can share work).

If so (i.e., we don't want to optimize this scenario), maybe it's better to let the upstream SourceExecutor directly start from latest.

Assume there's a lot of historical data, and relatively small throughput for new data.

Let's compare: (S=SourceExecutor, B=SourceBackfillExecutor)

case A. S starts from the specified offset, same as B

S begins with high workload to read historical data. The first B cannot catch up (as mentioned by this issue). We can implement some mechanism to let the first B immediately finishes, and forwards S (i.e., B=S).

When S doesn't finish reading historical data, and now we create 2nd B2, it probably cannot catch up. B2 also need to drop a lot of data from S.

After S finishes reading historical data, it will performs like case B.

case B. S starts from latest

S begins with low workload. All Bs perform similarly. None of them need to drop a lot of data from S.

A little difference with case A is that now the 1st B cannot finish immediately:

image

This also means that we do not need to implement the special treatment mentioned in this issue. At the same time, we might need to change upstream source's timing of creating source reader, to make sure it's latest when resumed (i.e., when the first MV created).

from risingwave.

xxchan avatar xxchan commented on September 26, 2024

I think case B (source starting from latest) is simple and works good in most simple cases.

changing this behavior isn't a breaking change

It's because backfill always start from specified offset, and where the upstream starts only affect when it finishes backfilling.

For the source executor, it only changes the starting position. When the offset is persisted in the state table, we will use that. So it's also fine.

from risingwave.

tabVersion avatar tabVersion commented on September 26, 2024

Is it common to have a lot of historical data to ingest from Kafka in reality?

In most business logic, Kafka historical data does not matter, it is far away from "real-time". But for a streaming database, a more common case is that online data is first written to OLTP for online serving and duplicated to kafka for some analysis. For this question, the feature is not the most frequently used one but essential.

from risingwave.

tabVersion avatar tabVersion commented on September 26, 2024

Actually for the first MV, we can directly skip the backfill stage, since the source is paused before the MV is created. However, to implement this, we need to somehow let the MV know whether it's the first...

I have a little concern with the pr's object, when creating a steaming job with source, we update relation_ref_count on the frontend. We can get the exact number instead of doing hacks.

from risingwave.

xxchan avatar xxchan commented on September 26, 2024

@tabVersion with #16626,we are not going to implement the original idea of the issue any more. We will backfill for every MV 👀

from risingwave.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.