[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA
[ https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580 ] Shuai Xu commented on FLINK-28650: -- The bug in partial insert with writable metadata is fixed in FLINK-30922. > Flink SQL Parsing bug for METADATA > -- > > Key: FLINK-28650 > URL: https://issues.apache.org/jira/browse/FLINK-28650 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: Jun Qin >Priority: Major > Fix For: 1.19.0 > > > With the following source/sink tables: > {code:sql} > CREATE TABLE sourceTable ( > `key` INT, > `time` TIMESTAMP(3), > `value` STRING NOT NULL, > id INT > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second'='10', > 'fields.id.kind'='sequence', > 'fields.id.start'='1', > 'fields.id.end'='100' > ); > CREATE TABLE sinkTable1 ( > `time` TIMESTAMP(3) METADATA FROM 'timestamp', > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > CREATE TABLE sinkTable2 ( > `time` TIMESTAMP(3),-- without METADATA > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > {code} > the following three pass the validation: > {code:sql} > INSERT INTO sinkTable1 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > but this one does not: > {code:sql} > INSERT INTO sinkTable1 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > It failed with > {code:java} > Unknown target column 'time' > {code} > It seems when providing column names in INSERT, the METADATA have an > undesired effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28650) Flink SQL Parsing bug for METADATA
[ https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580 ] Shuai Xu edited comment on FLINK-28650 at 10/16/23 7:58 AM: This bug is fixed in FLINK-30922. was (Author: JIRAUSER300096): The bug in partial insert with writable metadata is fixed in FLINK-30922. > Flink SQL Parsing bug for METADATA > -- > > Key: FLINK-28650 > URL: https://issues.apache.org/jira/browse/FLINK-28650 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: Jun Qin >Priority: Major > Fix For: 1.19.0 > > > With the following source/sink tables: > {code:sql} > CREATE TABLE sourceTable ( > `key` INT, > `time` TIMESTAMP(3), > `value` STRING NOT NULL, > id INT > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second'='10', > 'fields.id.kind'='sequence', > 'fields.id.start'='1', > 'fields.id.end'='100' > ); > CREATE TABLE sinkTable1 ( > `time` TIMESTAMP(3) METADATA FROM 'timestamp', > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > CREATE TABLE sinkTable2 ( > `time` TIMESTAMP(3),-- without METADATA > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > {code} > the following three pass the validation: > {code:sql} > INSERT INTO sinkTable1 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > but this one does not: > {code:sql} > INSERT INTO sinkTable1 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > It failed with > {code:java} > Unknown target column 'time' > {code} > It seems when providing column names in INSERT, the METADATA have an > undesired effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25054) Improve exception message for unsupported hashLength for SHA2 function
[ https://issues.apache.org/jira/browse/FLINK-25054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756784#comment-17756784 ] Shuai Xu commented on FLINK-25054: -- Hi, I would like to fix this issue. Could it be assigned to me, please? > Improve exception message for unsupported hashLength for SHA2 function > -- > > Key: FLINK-25054 > URL: https://issues.apache.org/jira/browse/FLINK-25054 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.3 >Reporter: DingGeGe >Priority: Major > Attachments: image-2021-11-25-16-59-56-699.png > > Original Estimate: 1h > Remaining Estimate: 1h > > 【exception sql】 > SELECT > SHA2(, 128) > FROM > > 【effect】 > when sql is long , it`s hard to clear where is the problem on this issue > 【reason】 > build-in function SHA2, hashLength do not support “128”, but I could not > understand from > 【Exception log】 > !image-2021-11-25-16-59-56-699.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785 ] Shuai Xu commented on FLINK-27741: -- Hi, I would like to fix this issue. Could it be assigned to me, please? > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > at >
[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785 ] Shuai Xu edited comment on FLINK-27741 at 8/21/23 9:58 AM: --- Hi [~chenzihao] , it appears that your pull request has not been approved. Would you like to continue working on it? If not, I would be happy to fix it. was (Author: JIRAUSER300096): Hi, I would like to fix this issue. Could it be assigned to me, please? > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279) > at >
[jira] [Commented] (FLINK-28866) Use DDL instead of legacy method to register the test source in JoinITCase
[ https://issues.apache.org/jira/browse/FLINK-28866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757169#comment-17757169 ] Shuai Xu commented on FLINK-28866: -- Hi, I would like to fix this issue. Could it be assigned to me? > Use DDL instead of legacy method to register the test source in JoinITCase > -- > > Key: FLINK-28866 > URL: https://issues.apache.org/jira/browse/FLINK-28866 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan
[ https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031 ] Shuai Xu edited comment on FLINK-31952 at 4/27/23 7:31 AM: --- Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? was (Author: JIRAUSER300096): Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? > Support 'EXPLAIN' statement for CompiledPlan > > > Key: FLINK-31952 > URL: https://issues.apache.org/jira/browse/FLINK-31952 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > Support the explain SQL syntax towards serialized CompiledPlan > {code:sql} > EXPLAIN [ | PLAN FOR] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan
[ https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031 ] Shuai Xu commented on FLINK-31952: -- Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? > Support 'EXPLAIN' statement for CompiledPlan > > > Key: FLINK-31952 > URL: https://issues.apache.org/jira/browse/FLINK-31952 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > Support the explain SQL syntax towards serialized CompiledPlan > {code:sql} > EXPLAIN [ | PLAN FOR] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31956) Extend the CompiledPlan to read from/write to Flink's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-31956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717082#comment-17717082 ] Shuai Xu commented on FLINK-31956: -- Hi Jane, I have taken the FLINK-31952 which is similar to the task. I think it could be achieved by modifying the streamplanner and tableEnvironmentImpl.Could I take the task? > Extend the CompiledPlan to read from/write to Flink's FileSystem > > > Key: FLINK-31956 > URL: https://issues.apache.org/jira/browse/FLINK-31956 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > At present, COMPILE/EXECUTE PLAN FOR '${plan.json}' only supports writing > to/reading from a local file without the scheme. We propose to extend the > support for Flink's FileSystem. > {code:sql} > -- before > COMPILE PLAN FOR '/tmp/foo/bar.json' > EXECUTE PLAN FOR '/tmp/foo/bar.json' > -- after > COMPILE PLAN FOR 'file:///tmp/foo/bar.json' > COMPILE PLAN FOR 'hdfs:///tmp/foo/bar.json' > COMPILE PLAN FOR 's3:///tmp/foo/bar.json' > COMPILE PLAN FOR 'oss:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'file:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'hdfs:///tmp/foo/bar.json' > EXECUTE PLAN FOR 's3:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'oss:///tmp/foo/bar.json' {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Description: I compiled a plan for an insert statement and then executed the plan. However, the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. I have checked the Flink dashboard and confirmed that the job is running normally. The only issue is that the SQL client is stuck and cannot accept new commands. Here is a part of the stack trace for reference.: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} was: I compiled plan for insert statement firstly and then I execute the plan. However the sql client is pending after running execute plan statement. Here is the part of stacktrace: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at
[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting
Shuai Xu created FLINK-32219: Summary: sql client would be pending after executing plan of inserting Key: FLINK-32219 URL: https://issues.apache.org/jira/browse/FLINK-32219 Project: Flink Issue Type: Bug Reporter: Shuai Xu I compiled plan for insert statement firstly and then I execute the plan. However the sql client is pending after running execute plan statement. Here is the part of stacktrace: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32219) sql client would be pending after executing plan of inserting
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Affects Version/s: 1.17.1 > sql client would be pending after executing plan of inserting > - > > Key: FLINK-32219 > URL: https://issues.apache.org/jira/browse/FLINK-32219 > Project: Flink > Issue Type: Bug >Affects Versions: 1.17.1 >Reporter: Shuai Xu >Priority: Major > > I compiled plan for insert statement firstly and then I execute the plan. > However the sql client is pending after running execute plan statement. Here > is the part of stacktrace: > {code:java} > "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 > waiting on condition [0x000173e01000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00076e72af20> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) > at > org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) > at > org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) > at java.util.Iterator.forEachRemaining(Iterator.java:115) > at > org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) > at > org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown > Source) > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) > at > org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown > Source) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown > Source) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Description: I compiled a plan for an INSERT statement and executed the plan, but the SQL client became unresponsive when executing the EXECUTE PLAN statement. I confirmed that the Flink job is running normally by checking the Flink dashboard. The only issue is that the SQL client becomes stuck and cannot accept new commands. I printed the stack trace of the SQL client process, and here is a part of it for reference. {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} was: I compiled a plan for an insert statement and then executed the plan. However, the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. I have checked the Flink dashboard and confirmed that the job is running normally. The only issue is that the SQL client is stuck and cannot accept new commands. Here is a part of the stack trace for reference.: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at
[jira] [Commented] (FLINK-32824) Port Calcite's fix for the sql like operator
[ https://issues.apache.org/jira/browse/FLINK-32824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752618#comment-17752618 ] Shuai Xu commented on FLINK-32824: -- Hi, lincoln. I‘d like to fix this issue, could you assign it to me? > Port Calcite's fix for the sql like operator > > > Key: FLINK-32824 > URL: https://issues.apache.org/jira/browse/FLINK-32824 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0, 1.17.1 >Reporter: lincoln lee >Priority: Major > Fix For: 1.19.0 > > > we should port the bugfix of sql like operator > https://issues.apache.org/jira/browse/CALCITE-1898 > {code} > The LIKE operator must match '.' (period) literally, not treat it as a > wild-card. Currently it treats it the same as '_'. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34221) Introduce operator for minibatch join
Shuai Xu created FLINK-34221: Summary: Introduce operator for minibatch join Key: FLINK-34221 URL: https://issues.apache.org/jira/browse/FLINK-34221 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 Introduce operator that implements minibatch join -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34222) Get minibatch join operator involved
Shuai Xu created FLINK-34222: Summary: Get minibatch join operator involved Key: FLINK-34222 URL: https://issues.apache.org/jira/browse/FLINK-34222 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 Get minibatch join operator involved which includes both plan and operator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34220) introduce buffer bundle for minibatch join
Shuai Xu created FLINK-34220: Summary: introduce buffer bundle for minibatch join Key: FLINK-34220 URL: https://issues.apache.org/jira/browse/FLINK-34220 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 introduce buffer bundle for storing records to implement minibatch join -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34219) Introduce a new join operator to support minibatch
Shuai Xu created FLINK-34219: Summary: Introduce a new join operator to support minibatch Key: FLINK-34219 URL: https://issues.apache.org/jira/browse/FLINK-34219 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 This is the parent task of FLIP-415. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193 ] Shuai Xu edited comment on FLINK-34355 at 2/5/24 7:19 AM: -- Hi, I'd like to take this verification. cc [~hackergin] . was (Author: JIRAUSER300096): Hi, I'd like to take this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193 ] Shuai Xu commented on FLINK-34355: -- Hi, I'd like to take this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818611#comment-17818611 ] Shuai Xu commented on FLINK-34355: -- Hi, I have finished this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818612#comment-17818612 ] Shuai Xu commented on FLINK-34346: -- Hi, I have finished this testing. The exception I think could be improved has been linked to this jira. > Release Testing: Verify FLINK-24024 Support session Window TVF > -- > > Key: FLINK-34346 > URL: https://issues.apache.org/jira/browse/FLINK-34346 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Session window TVF is ready. Users can use Session window TVF aggregation > instead of using legacy session group window aggregation. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24250]) although it is still > being reviewed. > Further more, although session window join, session window rank and session > window deduplicate are in experimental state, If someone finds some bugs > about them, you could also open a Jira linked this one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34219) Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34219: - Release Note: Support minibatch regular join to reduce intermediate result and resolve record amplification in cascading join scenarios. > Introduce a new join operator to support minibatch > -- > > Key: FLINK-34219 > URL: https://issues.apache.org/jira/browse/FLINK-34219 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Fix For: 1.19.0 > > > This is the parent task of FLIP-415. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34462) Session window with negative parameter throws unclear exception
Shuai Xu created FLINK-34462: Summary: Session window with negative parameter throws unclear exception Key: FLINK-34462 URL: https://issues.apache.org/jira/browse/FLINK-34462 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: Shuai Xu Set invalid parameter in session window get unclear error. {code:java} // add test in WindowAggregateITCase def testEventTimeSessionWindowWithInvalidName(): Unit = { val sql = """ |SELECT | window_start, | window_end, | COUNT(*), | SUM(`bigdec`), | MAX(`double`), | MIN(`float`), | COUNT(DISTINCT `string`), | concat_distinct_agg(`string`) |FROM TABLE( | SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '-5' SECOND)) |GROUP BY window_start, window_end """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toDataStream.addSink(sink) env.execute() } {code} {code:java} java.lang.AssertionError: Sql optimization: Assertion error: null at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:128) at org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:60) at org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase.testEventTimeSessionWindowWithInvalidName(WindowAggregateITCase.scala:1239) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at
[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818384#comment-17818384 ] Shuai Xu commented on FLINK-34378: -- Hi [~xuyangzhong] . This is an expected behavior. To maintain order, additional data structures would need to be introduced, which would result in a performance degradation and the ordered effect would only materialize when parallelism is set to 1. If order preservation is required with a parallelism of 1, it suffices to simply turn off the minibatch feature. > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} > Result > {code:java} > ++---+---+---+---+ > | op | a | b | a0| b0| > ++---+---+---+---+ > | +I | 3 | 3 | 3 | 3 | > | +I | 7 | 7 | 7 | 7 | > | +I | 2 | 2 | 2 | 2 | > | +I | 5 | 5 | 5 | 5 | > | +I | 1 | 1 | 1 | 1 | > | +I | 6 | 6 | 6 | 6 | > | +I | 4 | 4 | 4 | 4 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+---+---+ > {code} > When I do not use minibatch join, the result is : > {code:java} > ++---+---+++ > | op | a | b | a0 | b0 | > ++---+---+++ > | +I | 1 | 1 | 1 | 1 | > | +I | 2 | 2 | 2 | 2 | > | +I | 3 | 3 | 3 | 3 | > | +I | 4 | 4 | 4 | 4 | > | +I | 5 | 5 | 5 | 5 | > | +I | 6 | 6 | 6 | 6 | > | +I | 7 | 7 | 7 | 7 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+++ > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
Shuai Xu created FLINK-33689: Summary: jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg Key: FLINK-33689 URL: https://issues.apache.org/jira/browse/FLINK-33689 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.18.0 Reporter: Shuai Xu Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} //代码占位符 def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} //代码占位符 List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Description: Run the test as following and enable LocalGlobal in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. was: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. > jsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Description: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. was: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} //代码占位符 def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} //代码占位符 List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. > jsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal and minibatch in > sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira
[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Summary: JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg (was: jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg) > JsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Summary: JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg (was: JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg) > JsonObjectAggFunction can't retract previous data which is invalid when > enabling local global agg > - > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode
[ https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967 ] Shuai Xu commented on FLINK-33549: -- I'd like to fix this. Could you assign it to me? [~luoyuxia] > Exception "Factory does not implement interface YieldingOperatorFactory" > thrown in batch mode > -- > > Key: FLINK-33549 > URL: https://issues.apache.org/jira/browse/FLINK-33549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: luoyuxia >Priority: Major > > When run a job in batch, it throws the following exception > {code:java} > java.lang.NullPointerException: Factory does not implement interface > org.apache.flink.streaming.api.operators.YieldingOperatorFactory > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67) > at > org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79) > at > org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:834) {code} > > When I disable multiple-input by setting > table.optimizer.multiple-input-enabled = false, it works then. > Should be introduced by FLINK-23621. > [In > here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60], > when the operator factory is instanceof YieldingOperatorFactory, it will set > mailbox executor. But when it's > BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox > executor but it won't set the mailbox executor. for the operators wrapped by > the BatchMultipleInputStreamOperator. Then the exception is thrown. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode
[ https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967 ] Shuai Xu edited comment on FLINK-33549 at 11/23/23 3:39 AM: Hi [~luoyuxia] , I'd like to fix this. Could you assign it to me? was (Author: JIRAUSER300096): I'd like to fix this. Could you assign it to me? [~luoyuxia] > Exception "Factory does not implement interface YieldingOperatorFactory" > thrown in batch mode > -- > > Key: FLINK-33549 > URL: https://issues.apache.org/jira/browse/FLINK-33549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: luoyuxia >Priority: Major > > When run a job in batch, it throws the following exception > {code:java} > java.lang.NullPointerException: Factory does not implement interface > org.apache.flink.streaming.api.operators.YieldingOperatorFactory > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67) > at > org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79) > at > org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:834) {code} > > When I disable multiple-input by setting > table.optimizer.multiple-input-enabled = false, it works then. > Should be introduced by FLINK-23621. > [In > here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60], > when the operator factory is instanceof YieldingOperatorFactory, it will set > mailbox executor. But when it's > BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox > executor but it won't set the mailbox executor. for the operators wrapped by > the BatchMultipleInputStreamOperator. Then the exception is thrown. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) Get minibatch join operator involved
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Description: Get minibatch join operator involved in which includes both plan and operator. Implement minibatch join in E2E. (was: Get minibatch join operator involved which includes both plan and operator) > Get minibatch join operator involved > > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Shuai Xu >Priority: Major > Fix For: 1.19.0 > > > Get minibatch join operator involved in which includes both plan and > operator. Implement minibatch join in E2E. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join
Shuai Xu created FLINK-34256: Summary: Add a documentation section for minibatch join Key: FLINK-34256 URL: https://issues.apache.org/jira/browse/FLINK-34256 Project: Flink Issue Type: Sub-task Components: Documentation Affects Versions: 1.19.0 Reporter: Shuai Xu We should add a minibatch join section in Performance Tuning to explain the usage and principle of minibatch-join. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) End to end implementation of minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Description: Implement minibatch join in E2E which includes both plan and runtime parts. (was: Get minibatch join operator involved in which includes both plan and operator. Implement minibatch join in E2E.) Summary: End to end implementation of minibatch join (was: Get minibatch join operator involved) > End to end implementation of minibatch join > --- > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Implement minibatch join in E2E which includes both plan and runtime parts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) Supports mini-batch for streaming regular join
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Summary: Supports mini-batch for streaming regular join (was: End to end implementation of minibatch join) > Supports mini-batch for streaming regular join > -- > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Implement minibatch join in E2E which includes both plan and runtime parts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814016#comment-17814016 ] Shuai Xu commented on FLINK-34300: -- Hi, [~xuyangzhong]. I'd like to take this verification. > Release Testing Instructions: Verify FLINK-24024 Support session Window TVF > --- > > Key: FLINK-34300 > URL: https://issues.apache.org/jira/browse/FLINK-34300 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34349: - Description: Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing: Verify FLINK-34219 Introduce a new join operator to support > minibatch > -- > > Key: FLINK-34349 > URL: https://issues.apache.org/jira/browse/FLINK-34349 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Minibatch join is ready. Users could improve performance in regular stream > join scenarios. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24240)] although it is still > being reviewed. > If someone finds some bugs about this feature, you open a Jira linked this > one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304 ] Shuai Xu deleted comment on FLINK-34304: -- was (Author: JIRAUSER300096): Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814015#comment-17814015 ] Shuai Xu commented on FLINK-34304: -- Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
Shuai Xu created FLINK-34349: Summary: Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch Key: FLINK-34349 URL: https://issues.apache.org/jira/browse/FLINK-34349 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Affects Versions: 1.19.0 Reporter: Shuai Xu Assignee: Shuai Xu Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814192#comment-17814192 ] Shuai Xu commented on FLINK-34304: -- This test is opened in another issue FLINK-34349 and this issue would be closed. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668 ] Shuai Xu edited comment on FLINK-34694 at 4/10/24 9:43 AM: --- Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. was (Author: JIRAUSER300096): Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides this, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835648#comment-17835648 ] Shuai Xu commented on FLINK-34694: -- Hi [~rovboyko] , your idea looks interesting. Actually I found that this optimization does not reduce the overhead of state access after reading your code rather reduces the state to some extent . IMO, the marginal reduction in size may not significantly impact the overhead of storage, given that it constitutes a small fraction relative to the records held in the state. BTW, if you plan to pursue this optimization further, could you provide more comprehensive benchmark details? The benchmark results of multiple tests and overall performance of all queries are convincing. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668 ] Shuai Xu commented on FLINK-34694: -- Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides this, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839464#comment-17839464 ] Shuai Xu commented on FLINK-35184: -- Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() in GenericRowData. Could you please give a rough explanation of your solutions before implementing it? > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947 ] Shuai Xu edited comment on FLINK-35184 at 4/23/24 5:46 AM: --- [~rovboyko] , absolutely, please feel free to start the implementation. was (Author: JIRAUSER300096): Absolutely, please feel free to start the implementation. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947 ] Shuai Xu commented on FLINK-35184: -- Absolutely, please feel free to start the implementation. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839920#comment-17839920 ] Shuai Xu commented on FLINK-35184: -- Hi [~rovboyko] , actually it can't be avoid hash collision even if using BinaryRowData which can only reduce the probability to some extent. And the solution you mentioned works for me. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35230) Split FlinkSqlParserImplTest to reduce the code lines.
[ https://issues.apache.org/jira/browse/FLINK-35230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840623#comment-17840623 ] Shuai Xu commented on FLINK-35230: -- [~lsy] I'd like to take this, would you assign it to me? > Split FlinkSqlParserImplTest to reduce the code lines. > -- > > Key: FLINK-35230 > URL: https://issues.apache.org/jira/browse/FLINK-35230 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / API >Reporter: Feng Jin >Priority: Major > > With the increasing extension of Calcite syntax, the current > FlinkSqlParserImplTest has reached nearly 3000 lines of code. > If it exceeds the current limit, it will result in errors in the code style > check. > {code:java} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yaml08:33:19.679 [ERROR] > src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:[1] > (sizes) FileLength: File length is 3,166 lines (max allowed is 3,100). > {code} > To facilitate future syntax extends, I suggest that we split > FlinkSqlParserImplTest and place the same type of syntax in separate Java > tests for the convenience of avoiding the continuous growth of the original > test class. > My current idea is: > Since *FlinkSqlParserImplTest* currently inherits {*}SqlParserTest{*}, and > *SqlParserTest* itself contains many unit tests, for the convenience of > future test splits, we should introduce a basic *ParserTestBase* inheriting > {*}SqlParserTest{*}, and disable the original related unit tests in > {*}SqlParserTest{*}. > This will facilitate writing relevant unit tests more quickly during > subsequent splitting, without the need to repeatedly execute the unit tests > inside SqlParserTest. > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59113=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626 ] Shuai Xu commented on FLINK-34380: -- Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guanrantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guanranteed on a node by using LinkedHashMap, however, it could not be guranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626 ] Shuai Xu edited comment on FLINK-34380 at 5/16/24 1:58 AM: --- Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guarantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guaranteed on a node by using LinkedHashMap, however, it could not be guaranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. was (Author: JIRAUSER300096): Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guanrantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guanranteed on a node by using LinkedHashMap, however, it could not be guranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources
[ https://issues.apache.org/jira/browse/FLINK-34500 ] Shuai Xu deleted comment on FLINK-34500: -- was (Author: JIRAUSER300096): Hi, Could I take this verification? > Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL > Sources > - > > Key: FLINK-34500 > URL: https://issues.apache.org/jira/browse/FLINK-34500 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Parent, Table SQL / API >Affects Versions: 1.19.0 >Reporter: SuDewei >Assignee: Yun Tang >Priority: Blocker > Fix For: 1.19.0 > > > This issue aims to verify > [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. > Volunteers can verify it by following the [doc > changes|https://github.com/apache/flink/pull/24234]. Since currently only the > pre-defined DataGen connector and user-defined connector supports setting > source parallelism, volunteers can verify it through DataGen Connector. > The basic steps include: > 1. Start a Flink cluster and submit a Flink SQL Job to the cluster. > 2. In this Flink Job, use the DataGen SQL Connector to generate data. > 3. Specify the parameter scan.parallelism in DataGen connector options as > user-defined parallelism instead of default parallelism. > 4. Observe whether the parallelism of the source has changed on the job graph > of the Flink Application UI, and whether the shuffle mode is correct. > If everything is normal, you will see that the parallelism of the source > operator is indeed different from that of downstream, and the shuffle mode is > rebalanced by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources
[ https://issues.apache.org/jira/browse/FLINK-34500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819964#comment-17819964 ] Shuai Xu commented on FLINK-34500: -- Hi, Could I take this verification? > Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL > Sources > - > > Key: FLINK-34500 > URL: https://issues.apache.org/jira/browse/FLINK-34500 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Parent, Table SQL / API >Affects Versions: 1.19.0 >Reporter: SuDewei >Assignee: Yun Tang >Priority: Blocker > Fix For: 1.19.0 > > > This issue aims to verify > [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. > Volunteers can verify it by following the [doc > changes|https://github.com/apache/flink/pull/24234]. Since currently only the > pre-defined DataGen connector and user-defined connector supports setting > source parallelism, volunteers can verify it through DataGen Connector. > The basic steps include: > 1. Start a Flink cluster and submit a Flink SQL Job to the cluster. > 2. In this Flink Job, use the DataGen SQL Connector to generate data. > 3. Specify the parameter scan.parallelism in DataGen connector options as > user-defined parallelism instead of default parallelism. > 4. Observe whether the parallelism of the source has changed on the job graph > of the Flink Application UI, and whether the shuffle mode is correct. > If everything is normal, you will see that the parallelism of the source > operator is indeed different from that of downstream, and the shuffle mode is > rebalanced by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822433#comment-17822433 ] Shuai Xu commented on FLINK-34380: -- Let me take a look. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)