Coder Social home page Coder Social logo

Comments (5)

lmatz avatar lmatz commented on September 26, 2024 1

This is the Flink plan after enabling table.optimizer.distinct-agg.split.enabled: true

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, =($e, 3) AS $g_3, AND(=($e, 3), $f1) AS $g_30, AND(=($e, 3), $f2) AS $g_31, AND(=($e, 3), $f3) AS $g_32, =($e, 1) AS $g_1, AND(=($e, 1), $f1) AS $g_10, AND(=($e, 1), $f2) AS $g_11, AND(=($e, 1), $f3) AS $g_12, =($e, 2) AS $g_2, AND(=($e, 2), $f1) AS $g_20, AND(=($e, 2), $f2) AS $g_21, AND(=($e, 2), $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, IS TRUE(<(bid.price, 10000)) AS $f1, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f2, IS TRUE(>=(bid.price, 1000000)) AS $f3, bid.bidder AS bidder, AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(, 1024) AS $f7], where=[=(event_type, 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, ($e = 3) AS $g_3, (($e = 3) AND $f1) AS $g_30, (($e = 3) AND $f2) AS $g_31, (($e = 3) AND $f3) AS $g_32, ($e = 1) AS $g_1, (($e = 1) AND $f1) AS $g_10, (($e = 1) AND $f2) AS $g_11, (($e = 1) AND $f3) AS $g_12, ($e = 2) AS $g_2, (($e = 2) AND $f1) AS $g_20, (($e = 2) AND $f2) AS $g_21, (($e = 2) AND $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, (bid.price < 10000) IS TRUE AS $f1, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f2, (bid.price >= 1000000) IS TRUE AS $f3, bid.bidder AS bidder, AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(, 1024) AS $f7], where=[(event_type = 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

link #11964

from risingwave.

lmatz avatar lmatz commented on September 26, 2024

Let's compare the metrics between RW 1X and RW 4X.




7.43*4 = 29.72 < 33. 4X is using even more memory.

Agg's Cache Miss Rate



1X's Agg's cache miss rate is even higher than 4X's.

Agg's Cached Keys



Hummock Read



4X's block cache miss rate is even lower than 1X, although both cache miss rate has periodic flucutaion.

from risingwave.

lmatz avatar lmatz commented on September 26, 2024

Since q15 has distinct aggregation, which is a complex operator that may become the bottleneck.

We remove the distinct keyword in all the aggregations and introduce q15-no-distinct:

CREATE SINK nexmark_q15_no_distinct AS
    SELECT to_char(date_time, 'YYYY-MM-DD')                                          as "day",
           count(*)                                                                  AS total_bids,
           count(*) filter (where price < 10000)                                     AS rank1_bids,
           count(*) filter (where price >= 10000 and price < 1000000)                AS rank2_bids,
           count(*) filter (where price >= 1000000)                                  AS rank3_bids,
           count(bidder)                                                    AS total_bidders,
           count(bidder) filter (where price < 10000)                       AS rank1_bidders,
           count(bidder) filter (where price >= 10000 and price < 1000000)  AS rank2_bidders,
           count(bidder) filter (where price >= 1000000)                    AS rank3_bidders,
           count(auction)                                                   AS total_auctions,
           count(auction) filter (where price < 10000)                      AS rank1_auctions,
           count(auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
           count(auction) filter (where price >= 1000000)                   AS rank3_auctions
    FROM bid
    GROUP BY to_char(date_time, 'YYYY-MM-DD')
    WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Since the group by columns is a single time column, by default, we use two phase aggregation:

Therefore, the plan:

 StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr2, sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32)))] }
   └─StreamHashAgg { group_key: [$expr2], aggs: [sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32))), count] }
     └─StreamExchange { dist: HashShard($expr2) }
       └─StreamHashAgg [append_only] { group_key: [$expr2, $expr6], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count($expr4), count($expr4) filter(($expr3 < 10000:Int32)), count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr4) filter(($expr3 >= 1000000:Int32)), count($expr5), count($expr5) filter(($expr3 < 10000:Int32)), count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr5) filter(($expr3 >= 1000000:Int32))] }
         └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id, Vnode(_row_id) as $expr6] }
           └─StreamFilter { predicate: (event_type = 2:Int32) }
             └─StreamRowIdGen { row_id_index: 6 }
               └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                 └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                   └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)

RW 1X:

RW 4X:

RW 4X: 3.3M
RW 1X: 923K

Both almost close to the throughput of a stateless query.

4X/1X Ratio: 3.57

Both the scalability and the absolute throughput of RW are way much better when there is no distinct.

Flink 1X:

Flink 4X:

Flink 4X: 3.3M
Flink 1X: 1M

we can conclude that q15-no-distinct is a scalable query for both RW and Flink.

from risingwave.

lmatz avatar lmatz commented on September 26, 2024

With @st1page and @Little-Wallace ,

we suspected that the aggregation dirty heap size, 64MB by default, is limiting the throughput.
The reason is that due to the group by columns, aka day, bidder, and auction, and the data pattern, the skewness is strong, all the data go to a single partial aggregation operator at any given time

We found that the aggregation dirty heap size fluctuates around 64MB:

The figure above comes from:

Flink has a similar knob to tune.

However, after we set stream_hash_agg_max_dirty_groups_heap_size to 268435456, aka 256MB, it does not change much

The throughput does not change.

The two executions mentioned in this thread can be found at metabase:

from risingwave.

lmatz avatar lmatz commented on September 26, 2024

#15696 has improved by about 20%

from risingwave.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.