[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706329#comment-17706329 ] lincoln lee commented on FLINK-31165: - [~qingyue] thanks for driving this! assigned to you. > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Assignee: Jane Chan >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706313#comment-17706313 ] Jane Chan commented on FLINK-31165: --- I agree with the error msg improvement, and I'd like to do this task. Cc [~lincoln.86xy] [~godfrey] > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706307#comment-17706307 ] Godfrey He commented on FLINK-31165: [~lincoln.86xy] [~qingyue] I prefer to just improve the error message in FlinkLogicalOverAggregateConverter, and it can be: The window rank function requires the order by with variable column. > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703843#comment-17703843 ] lincoln lee commented on FLINK-31165: - [~qingyue] If there is a large cost on implementation, then I would prefer to optimize the current error message to prompt user for a clearer indication. [~godfreyhe] WDYT? > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703655#comment-17703655 ] Jane Chan commented on FLINK-31165: --- The current behavior under the Flink batch is the same as under streaming mode since this rewrite rule is applied during the LogicalWindow creation. My concern mainly comes from the implementation aspect. The reason that caused this problem is the use of constant folding optimization when creating LogicalWindow, which leads to the orderByKey being empty when passed to FlinkLogicalOverAggregateConverter. There are two possible solutions. The first one is to remove the constant folding optimization or add some judgment here, such as giving up optimization when orderByKey becomes empty after optimization. The second one is to remove the check of orderByKey in FlinkLogicalOverAggregateConverter, but then the problem becomes how to distinguish between order by constants and no order by clause. > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703643#comment-17703643 ] lincoln lee commented on FLINK-31165: - [~qingyue] can you verify the behavior of this case under flink batch? Personally I prefer to keep a unified behavior on streaming and batch, non-determinism should not be the only reason to reject the query, because similar proctime based computations on streaming are also mostly non-deterministic, WDYT? > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703499#comment-17703499 ] Jane Chan commented on FLINK-31165: --- I rethink it, from the streaming semantics the result might be non-deterministic if supporting order by constants. So I suggest throwing a meaningful error to indicate users not to use constants as the order by key. WDYT? [~godfrey] [~lincoln.86xy] > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703466#comment-17703466 ] Jane Chan commented on FLINK-31165: --- Hi [~rohankrao] , thanks for reporting this issue. Actually, the order by field SRC_NO is a constant and is folded during query rewrite. {code:java} LogicalProject(inputs=[0..2]) +- LogicalFilter(condition=[<=($2, 1)]) +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY 2022-01-01 NULLS FIRST)]]) +- LogicalTableScan(table=[[default_catalog, default_database, temp_table]]) {code} >From the perspective of SQL semantics, using a constant as the order by field >for row_number has no meaning, because the constant will not change the >sorting result of row_number, and each row will get the same rank. As a >current workaround, please try to specify another field to ensure that >row_number is sorted in the correct order. While I tested the query against MySQL and PostgreSQL, they do allow this to happen, and just output the first inserted row. Do you think we need to align this behavior? cc [~godfreyhe] [~lincoln.86xy] > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at >
[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query
[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691493#comment-17691493 ] Martijn Visser commented on FLINK-31165: [~godfrey] [~lincoln.86xy] Any thoughts on this one? > Over Agg: The window rank function without order by error in top N query > > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: P Rohan Kumar >Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at >