[jira] [Updated] (FLINK-32501) Wrong execution plan of a proctime window aggregation generated due to incorrect cost evaluation

2023-07-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-32501:

Fix Version/s: (was: 1.17.2)

> Wrong execution plan of a proctime window aggregation generated due to 
> incorrect cost evaluation
> 
>
> Key: FLINK-32501
> URL: https://issues.apache.org/jira/browse/FLINK-32501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently when uses window aggregation referring a windowing tvf with a 
> filter condition, may encounter wrong plan which may hang forever in 
> runtime(the window aggregate operator never output)
> for such a case:
> {code}
> insert into sink
> select
> window_start,
> window_end,
> b,
> COALESCE(sum(case
> when a = 11
> then 1
> end), 0) c
> from
> TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
> )
> where
> a in (1, 5, 7, 9, 11)
> GROUP BY
> window_start, window_end, b
> {code}
> generate wrong plan which didn't combine the proctime WindowTableFunction 
> into WindowAggregate (so when translate to execution plan the WindowAggregate 
> will wrongly recognize the window as an event-time window, then the 
> WindowAggregateOperator will not receive watermark nor setup timers to fire 
> any windows in runtime)
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) 
> AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, 
> null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
> size=[10 s])])
>+- Calc(select=[a, b, PROCTIME() AS proctime])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> expected plan:
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], 
> size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, 
> end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, 
> PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32501) Wrong execution plan of a proctime window aggregation generated due to incorrect cost evaluation

2023-06-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32501:
---
Labels: pull-request-available  (was: )

> Wrong execution plan of a proctime window aggregation generated due to 
> incorrect cost evaluation
> 
>
> Key: FLINK-32501
> URL: https://issues.apache.org/jira/browse/FLINK-32501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> Currently when uses window aggregation referring a windowing tvf with a 
> filter condition, may encounter wrong plan which may hang forever in 
> runtime(the window aggregate operator never output)
> for such a case:
> {code}
> insert into sink
> select
> window_start,
> window_end,
> b,
> COALESCE(sum(case
> when a = 11
> then 1
> end), 0) c
> from
> TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
> )
> where
> a in (1, 5, 7, 9, 11)
> GROUP BY
> window_start, window_end, b
> {code}
> generate wrong plan which didn't combine the proctime WindowTableFunction 
> into WindowAggregate (so when translate to execution plan the WindowAggregate 
> will wrongly recognize the window as an event-time window, then the 
> WindowAggregateOperator will not receive watermark nor setup timers to fire 
> any windows in runtime)
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) 
> AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, 
> null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
> size=[10 s])])
>+- Calc(select=[a, b, PROCTIME() AS proctime])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> expected plan:
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], 
> size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, 
> end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, 
> PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)