[ https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744847#comment-17744847 ]
lincoln lee commented on FLINK-32578: ------------------------------------- Fixed in 1.17: dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34 > Cascaded group by window time columns on a proctime window aggregate may > result hang for ever > --------------------------------------------------------------------------------------------- > > Key: FLINK-32578 > URL: https://issues.apache.org/jira/browse/FLINK-32578 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.17.1 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > Currently when group by window time columns on a proctime window aggregate > result will get a wrong plan which may result hang for ever in runtime. > For such a query: > {code} > insert into s1 > SELECT > window_start, > window_end, > sum(cnt), > count(*) > FROM ( > SELECT > a, > b, > window_start, > window_end, > count(*) as cnt, > sum(d) as sum_d, > max(d) as max_d > FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) > GROUP BY a, window_start, window_end, b > ) > GROUP BY a, window_start, window_end > {code} > the inner proctime window works fine, but the outer one doesn't work due to a > wrong plan which will translate to a unexpected event mode window operator: > {code} > Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS > c]) > +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) > AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[a]]) > +- Calc(select=[a, window_start, window_end, cnt]) > +- WindowAggregate(groupBy=[a, b], > window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS > cnt, start('w$) AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[a, b]]) > +- Calc(select=[a, b, d, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)