[ https://issues.apache.org/jira/browse/FLINK-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young closed FLINK-15577. ------------------------------ Fix Version/s: 1.10.0 1.9.2 Resolution: Fixed 1.9.2: 578a70901230e83351287ffa6b73b27f5a16d8ad 1.10.0: 98d209eab2f6e6cad0bc678876a36090c774082b master: 244718553742c086eefc95f927d7b26af597d40a > WindowAggregate RelNodes missing Window specs in digest > ------------------------------------------------------- > > Key: FLINK-15577 > URL: https://issues.apache.org/jira/browse/FLINK-15577 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner > Affects Versions: 1.9.1 > Reporter: Benoit Hanotte > Assignee: Benoit Hanotte > Priority: Critical > Labels: pull-request-available > Fix For: 1.9.2, 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The RelNode's digest (AbstractRelNode.getDigest()), along with its RowType, > is used by the Calcite HepPlanner to avoid adding duplicate Vertices to the > graph. If an equivalent vertex is already present in the graph, then that > vertex is used in place of the newly generated one: > https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828 > This means that *the digest needs to contain all the information necessary to > identify a vertex and distinguish it from similar - but not equivalent - > vertices*. > In the case of `LogicalWindowAggregation` and > `FlinkLogicalWindowAggregation`, the window specs are currently not in the > digest, meaning that two aggregations with the same signatures and > expressions but different windows are considered equivalent by the planner, > which is not correct and will lead to an invalid Physical Plan. > For instance, the following query would give an invalid plan: > {code} > WITH window_1h AS ( > SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as > `timestamp` > FROM my_table > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) > ), > window_2h AS ( > SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as > `timestamp` > FROM my_table > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ) > (SELECT * FROM window_1h) > UNION ALL > (SELECT * FROM window_2h) > {code} > The invalid plan generated by the planner is the following (*Please note the > windows in the two DataStreamGroupWindowAggregates nodes being the same when > they should be different*): > {code} > DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, > cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176 > DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, > cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173 > DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, > 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, > end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): > rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172 > DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, > cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 > DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, > cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175 > DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, > 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, > end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): > rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174 > DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, > cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)