[ 
https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744847#comment-17744847
 ] 

lincoln lee commented on FLINK-32578:
-------------------------------------

Fixed in 1.17: dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34

> Cascaded group by window time columns on a proctime window aggregate may 
> result hang for ever
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32578
>                 URL: https://issues.apache.org/jira/browse/FLINK-32578
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.1
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0, 1.17.2
>
>
> Currently when group by window time columns on a proctime window aggregate 
> result will get a wrong plan which may result hang for ever in runtime.
> For such a query:
> {code}
> insert into s1
> SELECT
>   window_start,
>   window_end,
>   sum(cnt),
>   count(*)
> FROM (
>  SELECT
>     a,
>     b,
>     window_start,
>     window_end,
>     count(*) as cnt,
>     sum(d) as sum_d,
>     max(d) as max_d
>  FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
>  GROUP BY a, window_start, window_end, b
> )
> GROUP BY a, window_start, window_end
> {code}
> the inner proctime window works fine, but the outer one doesn't work due to a 
> wrong plan which will translate to a unexpected event mode window operator:
> {code}
> Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS 
> c])
>    +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
> AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
>       +- Exchange(distribution=[hash[a]])
>          +- Calc(select=[a, window_start, window_end, cnt])
>             +- WindowAggregate(groupBy=[a, b], 
> window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
> cnt, start('w$) AS window_start, end('w$) AS window_end])
>                +- Exchange(distribution=[hash[a, b]])
>                   +- Calc(select=[a, b, d, PROCTIME() AS proctime])
>                      +- TableSourceScan(table=[[default_catalog, 
> default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to