[ 
https://issues.apache.org/jira/browse/FLINK-34238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34238:
---------------------------
    Description: 
Take the following plan as an example:
{code:java}
Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
+- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS 
EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS 
wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
window_end])
   +- Exchange(distribution=[hash[a]])
      +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS 
$f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[a])])
            +- Exchange(distribution=[hash[a]])
               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
                  +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be chained 
finally, theĀ  `Exchange` between `Calc` and `WindowAggregate` can be removed.

> In streaming mode, redundant exchange nodes can be optimally deleted in some 
> cases
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-34238
>                 URL: https://issues.apache.org/jira/browse/FLINK-34238
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: xuyang
>            Priority: Minor
>
> Take the following plan as an example:
> {code:java}
> Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
> +- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], 
> win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) 
> AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) 
> AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
> window_end])
>    +- Exchange(distribution=[hash[a]])
>       +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS 
> $f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
>          +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
> min], partition keys=[a])])
>             +- Exchange(distribution=[hash[a]])
>                +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>                   +- TableSourceScan(table=[[default_catalog, 
> default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
> If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be 
> chained finally, theĀ  `Exchange` between `Calc` and `WindowAggregate` can be 
> removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to