Comments (5)
This is the Flink plan after enabling table.optimizer.distinct-agg.split.enabled: true
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
+- Exchange(distribution=[hash[day]])
+- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
+- Exchange(distribution=[hash[day, $f6, $f7]])
+- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, =($e, 3) AS $g_3, AND(=($e, 3), $f1) AS $g_30, AND(=($e, 3), $f2) AS $g_31, AND(=($e, 3), $f3) AS $g_32, =($e, 1) AS $g_1, AND(=($e, 1), $f1) AS $g_10, AND(=($e, 1), $f2) AS $g_11, AND(=($e, 1), $f3) AS $g_12, =($e, 2) AS $g_2, AND(=($e, 2), $f1) AS $g_20, AND(=($e, 2), $f2) AS $g_21, AND(=($e, 2), $f3) AS $g_22])
+- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
+- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, IS TRUE(<(bid.price, 10000)) AS $f1, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f2, IS TRUE(>=(bid.price, 1000000)) AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[=(event_type, 2)])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
+- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
+- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
+- Exchange(distribution=[hash[day]])
+- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
+- Exchange(distribution=[hash[day, $f6, $f7]])
+- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, ($e = 3) AS $g_3, (($e = 3) AND $f1) AS $g_30, (($e = 3) AND $f2) AS $g_31, (($e = 3) AND $f3) AS $g_32, ($e = 1) AS $g_1, (($e = 1) AND $f1) AS $g_10, (($e = 1) AND $f2) AS $g_11, (($e = 1) AND $f3) AS $g_12, ($e = 2) AS $g_2, (($e = 2) AND $f1) AS $g_20, (($e = 2) AND $f2) AS $g_21, (($e = 2) AND $f3) AS $g_22])
+- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
+- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, (bid.price < 10000) IS TRUE AS $f1, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f2, (bid.price >= 1000000) IS TRUE AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[(event_type = 2)])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
+- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
+- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
link #11964
from risingwave.
Let's compare the metrics between RW 1X and RW 4X.
Memory
7.43*4 = 29.72 < 33. 4X is using even more memory.
Agg's Cache Miss Rate
1X's Agg's cache miss rate is even higher than 4X's.
Agg's Cached Keys
Hummock Read
4X's block cache miss rate is even lower than 1X, although both cache miss rate has periodic flucutaion.
from risingwave.
Since q15
has distinct aggregation
, which is a complex operator that may become the bottleneck.
We remove the distinct keyword in all the aggregations and introduce q15-no-distinct
:
https://github.com/risingwavelabs/kube-bench/blob/main/manifests/nexmark/nexmark-sinks.template.yaml#L597-L617
CREATE SINK nexmark_q15_no_distinct AS
SELECT to_char(date_time, 'YYYY-MM-DD') as "day",
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(bidder) AS total_bidders,
count(bidder) filter (where price < 10000) AS rank1_bidders,
count(bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(bidder) filter (where price >= 1000000) AS rank3_bidders,
count(auction) AS total_auctions,
count(auction) filter (where price < 10000) AS rank1_auctions,
count(auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD')
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Since the group by columns is a single time column, by default, we use two phase aggregation:
https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L115
Therefore, the plan:
StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
└─StreamProject { exprs: [$expr2, sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32)))] }
└─StreamHashAgg { group_key: [$expr2], aggs: [sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32))), count] }
└─StreamExchange { dist: HashShard($expr2) }
└─StreamHashAgg [append_only] { group_key: [$expr2, $expr6], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count($expr4), count($expr4) filter(($expr3 < 10000:Int32)), count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr4) filter(($expr3 >= 1000000:Int32)), count($expr5), count($expr5) filter(($expr3 < 10000:Int32)), count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr5) filter(($expr3 >= 1000000:Int32))] }
└─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id, Vnode(_row_id) as $expr6] }
└─StreamFilter { predicate: (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 6 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
└─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)
https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3155
https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3156
RW 4X: 3.3M
RW 1X: 923K
Both almost close to the throughput of a stateless query.
4X/1X Ratio: 3.57
Both the scalability and the absolute throughput of RW are way much better when there is no distinct
.
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/148
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/144
Flink 4X: 3.3M
Flink 1X: 1M
we can conclude that q15-no-distinct
is a scalable query for both RW and Flink.
from risingwave.
With @st1page and @Little-Wallace ,
we suspected that the aggregation dirty heap size, 64MB by default, is limiting the throughput.
The reason is that due to the group by columns, aka day
, bidder
, and auction
, and the data pattern, the skewness is strong, all the data go to a single partial aggregation operator at any given time
We found that the aggregation dirty heap size fluctuates around 64MB:
The figure above comes from:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709731899000&to=1709733586000&var-namespace=nexmark-ht-4x-1cn-affinity-10s
Flink has a similar knob to tune.
https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L658-L663
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/tuning/
However, after we set stream_hash_agg_max_dirty_groups_heap_size
to 268435456, aka 256MB, it does not change much
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1709736387000&orgId=1&to=1709738065000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-10s
The throughput does not change.
The two executions mentioned in this thread can be found at metabase: http://metabase.risingwave-cloud.xyz/question/9236-nexmark-q15-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2763?start_date=2024-01-04
from risingwave.
#15696 has improved by about 20%
from risingwave.
Related Issues (20)
- EOWC: close `RANGE`/`SESSION` window with watermark
- `ALTER TABLE` will refresh previous snapshot values for absent cells
- bug: fail to load workload identity token on azure environment
- cherrypick feat(udf): add metric of UDF memory usage (#16922) to branch release-1.9 HOT 1
- when using external schema, `struct`'s fields are not shown in `describe <table>` HOT 2
- Division by zero exception HOT 12
- e2e test time increased from 15min to 20min in ci HOT 1
- Performance lost after using `BTreeMap` for WITH properties
- hint user when column not found HOT 1
- bug: CI failure: integration test (madsim): progress not within bounds 0.9
- bug(main-cron): pulsar source check: gRPC request to meta service failed: Unknown error HOT 3
- feat: expose dedicated source for table in system catalog rw_sources
- Reclaim space more aggresively for table with vnode table watermark specificed (table with range delete)
- system table for source health status HOT 2
- SSL configurations is not supported for schema registry HOT 3
- Don't let jni_core depends on (the whole) storage crate HOT 2
- dep: try to use either aws-lc-rs or ring
- Rework ci labels
- reorganize directories at the root of the repo HOT 1
- error in metabase: function has_any_column_privilege does not exist HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from risingwave.