zhangqingru created FLINK-20345: ----------------------------------- Summary: Adds an Expand node only when there are more then one distinct aggregate function in an Aggregate when executes SplitAggregateRule Key: FLINK-20345 URL: https://issues.apache.org/jira/browse/FLINK-20345 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.11.2 Reporter: zhangqingru Fix For: 1.11.3
As mentioned in [Flink Document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html], we could split distinct aggregation to solve skew data on distinct keys which is a very good optimization. However, an unnecessary `Expand` node will be generated under some special cases, like the following sql. {code:java} SELECT COUNT(c) AS pv, COUNT(DISTINCT c) AS uv FROM T GROUP BY a {code} Which plan is like the following text, the Expand and filter condition in aggregate functions could be removed. {code:java} Sink(name=[DataStreamTableSink], fields=[pv, uv]) +- Calc(select=[pv, uv]) +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2]) +- Exchange(distribution=[hash[a]]) +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(c) FILTER $g_1 AS $f2_0, COUNT(DISTINCT c) FILTER $g_0 AS $f3]) +- Exchange(distribution=[hash[a, $f2]]) +- Calc(select=[a, c, $f2, =($e, 1) AS $g_1, =($e, 0) AS $g_0]) +- Expand(projects=[{a=[$0], c=[$1], $f2=[$2], $e=[0]}, {a=[$0], c=[$1], $f2=[null], $e=[1]}]) +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c]){code} An `Expand` node only is necessary when multiple aggregate function with different distinct keys appears in one Aggregate. -- This message was sent by Atlassian Jira (v8.3.4#803005)