[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA

2023-10-16 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580
 ] 

Shuai Xu commented on FLINK-28650:
--

The bug in partial insert with writable metadata is fixed in FLINK-30922.

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
> Fix For: 1.19.0
>
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28650) Flink SQL Parsing bug for METADATA

2023-10-16 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580
 ] 

Shuai Xu edited comment on FLINK-28650 at 10/16/23 7:58 AM:


This bug  is fixed in FLINK-30922.


was (Author: JIRAUSER300096):
The bug in partial insert with writable metadata is fixed in FLINK-30922.

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
> Fix For: 1.19.0
>
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25054) Improve exception message for unsupported hashLength for SHA2 function

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756784#comment-17756784
 ] 

Shuai Xu commented on FLINK-25054:
--

Hi, I would like to fix this issue. Could it be assigned to me, please?

> Improve exception message for unsupported hashLength for SHA2 function
> --
>
> Key: FLINK-25054
> URL: https://issues.apache.org/jira/browse/FLINK-25054
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.3
>Reporter: DingGeGe
>Priority: Major
> Attachments: image-2021-11-25-16-59-56-699.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> 【exception sql】
> SELECT
> SHA2(, 128)
> FROM
>  
> 【effect】
> when sql is long , it`s hard to clear where is the problem on this issue
> 【reason】
> build-in function SHA2, hashLength do not support “128”, but I could not 
> understand from
> 【Exception log】
> !image-2021-11-25-16-59-56-699.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785
 ] 

Shuai Xu commented on FLINK-27741:
--

Hi, I would like to fix this issue. Could it be assigned to me, please?

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:198)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
>   at 
> 

[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785
 ] 

Shuai Xu edited comment on FLINK-27741 at 8/21/23 9:58 AM:
---

Hi [~chenzihao] , it appears that your pull request has not been approved. 
Would you like to continue working on it? If not, I would be happy to fix it.


was (Author: JIRAUSER300096):
Hi, I would like to fix this issue. Could it be assigned to me, please?

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:279)
>   at 
> 

[jira] [Commented] (FLINK-28866) Use DDL instead of legacy method to register the test source in JoinITCase

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757169#comment-17757169
 ] 

Shuai Xu commented on FLINK-28866:
--

Hi, I would like to fix this issue. Could it be assigned to me?

> Use DDL instead of legacy method to register the test source in JoinITCase
> --
>
> Key: FLINK-28866
> URL: https://issues.apache.org/jira/browse/FLINK-28866
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031
 ] 

Shuai Xu edited comment on FLINK-31952 at 4/27/23 7:31 AM:
---

Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?


was (Author: JIRAUSER300096):
Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?

> Support 'EXPLAIN' statement for CompiledPlan
> 
>
> Key: FLINK-31952
> URL: https://issues.apache.org/jira/browse/FLINK-31952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
>  Support the explain SQL syntax towards serialized CompiledPlan
> {code:sql}
> EXPLAIN [  | PLAN FOR]  
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031
 ] 

Shuai Xu commented on FLINK-31952:
--

Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?

> Support 'EXPLAIN' statement for CompiledPlan
> 
>
> Key: FLINK-31952
> URL: https://issues.apache.org/jira/browse/FLINK-31952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
>  Support the explain SQL syntax towards serialized CompiledPlan
> {code:sql}
> EXPLAIN [  | PLAN FOR]  
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31956) Extend the CompiledPlan to read from/write to Flink's FileSystem

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717082#comment-17717082
 ] 

Shuai Xu commented on FLINK-31956:
--

Hi Jane, I have taken the FLINK-31952 which is similar to the task. I think it 
could be achieved by modifying the streamplanner and tableEnvironmentImpl.Could 
I take the task?

> Extend the CompiledPlan to read from/write to Flink's FileSystem
> 
>
> Key: FLINK-31956
> URL: https://issues.apache.org/jira/browse/FLINK-31956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
> At present, COMPILE/EXECUTE PLAN FOR '${plan.json}' only supports writing 
> to/reading from a local file without the scheme. We propose to extend the 
> support for Flink's FileSystem.
> {code:sql}
> -- before
> COMPILE PLAN FOR '/tmp/foo/bar.json' 
> EXECUTE PLAN FOR '/tmp/foo/bar.json' 
> -- after
> COMPILE PLAN FOR 'file:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 'hdfs:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 's3:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 'oss:///tmp/foo/bar.json'  
> EXECUTE PLAN FOR 'file:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 'hdfs:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 's3:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 'oss:///tmp/foo/bar.json' {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Description: 
I compiled a plan for an insert statement and then executed the plan. However, 
the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. 
I have checked the Flink dashboard and confirmed that the job is running 
normally. The only issue is that the SQL client is stuck and cannot accept new 
commands. Here is a part of the stack trace for reference.:
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}

  was:
I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 

[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-32219:


 Summary: sql client would be pending after executing plan of 
inserting
 Key: FLINK-32219
 URL: https://issues.apache.org/jira/browse/FLINK-32219
 Project: Flink
  Issue Type: Bug
Reporter: Shuai Xu


I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Affects Version/s: 1.17.1

> sql client would be pending after executing plan of inserting
> -
>
> Key: FLINK-32219
> URL: https://issues.apache.org/jira/browse/FLINK-32219
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: Shuai Xu
>Priority: Major
>
> I compiled plan for insert statement firstly and then I execute the plan. 
> However the sql client is pending after running execute plan statement. Here 
> is the part of stacktrace:
> {code:java}
> "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
> waiting on condition [0x000173e01000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00076e72af20> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
>   at 
> org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
>  Source)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Description: 
I compiled a plan for an INSERT statement and executed the plan, but the SQL 
client became unresponsive when executing the EXECUTE PLAN statement. I 
confirmed that the Flink job is running normally by checking the Flink 
dashboard. The only issue is that the SQL client becomes stuck and cannot 
accept new commands. I printed the stack trace of the SQL client process, and 
here is a part of it for reference.
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}

  was:
I compiled a plan for an insert statement and then executed the plan. However, 
the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. 
I have checked the Flink dashboard and confirmed that the job is running 
normally. The only issue is that the SQL client is stuck and cannot accept new 
commands. Here is a part of the stack trace for reference.:
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 

[jira] [Commented] (FLINK-32824) Port Calcite's fix for the sql like operator

2023-08-09 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752618#comment-17752618
 ] 

Shuai Xu commented on FLINK-32824:
--

Hi, lincoln. I‘d like to fix this issue, could you assign it to me?

> Port Calcite's fix for the sql like operator
> 
>
> Key: FLINK-32824
> URL: https://issues.apache.org/jira/browse/FLINK-32824
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.17.1
>Reporter: lincoln lee
>Priority: Major
> Fix For: 1.19.0
>
>
> we should port the bugfix of sql like operator 
> https://issues.apache.org/jira/browse/CALCITE-1898
> {code}
> The LIKE operator must match '.' (period) literally, not treat it as a 
> wild-card. Currently it treats it the same as '_'.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34221) Introduce operator for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34221:


 Summary: Introduce operator for minibatch join
 Key: FLINK-34221
 URL: https://issues.apache.org/jira/browse/FLINK-34221
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Introduce operator that implements minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34222) Get minibatch join operator involved

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34222:


 Summary: Get minibatch join operator involved
 Key: FLINK-34222
 URL: https://issues.apache.org/jira/browse/FLINK-34222
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Get minibatch join operator involved which includes both plan and operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34220) introduce buffer bundle for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34220:


 Summary: introduce buffer bundle for minibatch join
 Key: FLINK-34220
 URL: https://issues.apache.org/jira/browse/FLINK-34220
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


introduce buffer bundle for storing records to implement minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34219) Introduce a new join operator to support minibatch

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34219:


 Summary: Introduce a new join operator to support minibatch
 Key: FLINK-34219
 URL: https://issues.apache.org/jira/browse/FLINK-34219
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193
 ] 

Shuai Xu edited comment on FLINK-34355 at 2/5/24 7:19 AM:
--

Hi, I'd like to take this verification. cc [~hackergin] .


was (Author: JIRAUSER300096):
Hi, I'd like to take this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193
 ] 

Shuai Xu commented on FLINK-34355:
--

Hi, I'd like to take this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818611#comment-17818611
 ] 

Shuai Xu commented on FLINK-34355:
--

Hi, I have finished this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818612#comment-17818612
 ] 

Shuai Xu commented on FLINK-34346:
--

Hi, I have finished this testing. The exception I think could be improved has 
been linked to this jira.

> Release Testing: Verify FLINK-24024 Support session Window TVF
> --
>
> Key: FLINK-34346
> URL: https://issues.apache.org/jira/browse/FLINK-34346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Session window TVF is ready. Users can use Session window TVF aggregation 
> instead of using legacy session group window aggregation.
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24250]) although it is still 
> being reviewed. 
> Further more,  although session window join, session window rank and session 
> window deduplicate are in experimental state, If someone finds some bugs 
> about them, you could also open a Jira linked this one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34219) Introduce a new join operator to support minibatch

2024-02-20 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34219:
-
Release Note: Support minibatch regular join to reduce intermediate result 
and resolve record amplification in cascading join scenarios.

> Introduce a new join operator to support minibatch
> --
>
> Key: FLINK-34219
> URL: https://issues.apache.org/jira/browse/FLINK-34219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
> Fix For: 1.19.0
>
>
> This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34462) Session window with negative parameter throws unclear exception

2024-02-19 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34462:


 Summary: Session window with negative parameter throws unclear 
exception
 Key: FLINK-34462
 URL: https://issues.apache.org/jira/browse/FLINK-34462
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Shuai Xu


Set invalid parameter in session window get unclear error.
{code:java}
// add test in WindowAggregateITCase
def testEventTimeSessionWindowWithInvalidName(): Unit = {
  val sql =
"""
  |SELECT
  |  window_start,
  |  window_end,
  |  COUNT(*),
  |  SUM(`bigdec`),
  |  MAX(`double`),
  |  MIN(`float`),
  |  COUNT(DISTINCT `string`),
  |  concat_distinct_agg(`string`)
  |FROM TABLE(
  |   SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '-5' SECOND))
  |GROUP BY window_start, window_end
""".stripMargin

  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
} 

{code}
{code:java}
java.lang.AssertionError: Sql optimization: Assertion error: null at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:128)
 at 
org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:60)
 at 
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase.testEventTimeSessionWindowWithInvalidName(WindowAggregateITCase.scala:1239)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.Iterator.forEachRemaining(Iterator.java:116) at 
scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26)
 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 

[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818384#comment-17818384
 ] 

Shuai Xu commented on FLINK-34378:
--

Hi [~xuyangzhong] . This is an expected behavior. To maintain order, additional 
data structures would need to be introduced, which would result in a 
performance degradation and the ordered effect would only materialize when 
parallelism is set to 1. If order preservation is required with a parallelism 
of 1, it suffices to simply turn off the minibatch feature.

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-33689:


 Summary: jsonObjectAggFunction can't retract previous data which 
is invalid when enable local global agg
 Key: FLINK-33689
 URL: https://issues.apache.org/jira/browse/FLINK-33689
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Shuai Xu


Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Description: 
Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.

  was:
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.


> jsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Description: 
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.

  was:
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.


> jsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal and minibatch  in 
> sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira

[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Summary: JsonObjectAggFunction can't retract previous data which is invalid 
when enable local global agg  (was: jsonObjectAggFunction can't retract 
previous data which is invalid when enable local global agg)

> JsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Summary: JsonObjectAggFunction can't retract previous data which is invalid 
when enabling local global agg  (was: JsonObjectAggFunction can't retract 
previous data which is invalid when enable local global agg)

> JsonObjectAggFunction can't retract previous data which is invalid when 
> enabling local global agg
> -
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode

2023-11-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967
 ] 

Shuai Xu commented on FLINK-33549:
--

I'd like to fix this. Could you assign it to me? [~luoyuxia] 

> Exception "Factory does not implement interface YieldingOperatorFactory" 
> thrown in batch mode 
> --
>
> Key: FLINK-33549
> URL: https://issues.apache.org/jira/browse/FLINK-33549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: luoyuxia
>Priority: Major
>
> When run a job in batch, it throws the following exception
> {code:java}
> java.lang.NullPointerException: Factory does not implement interface 
> org.apache.flink.streaming.api.operators.YieldingOperatorFactory
>     at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
>     at 
> org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>     at java.lang.Thread.run(Thread.java:834) {code}
>  
> When I disable multiple-input by setting 
> table.optimizer.multiple-input-enabled = false, it works then. 
> Should be introduced by FLINK-23621.
> [In 
> here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
>  when the operator factory is instanceof YieldingOperatorFactory, it will set 
> mailbox executor. But when it's 
> BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox 
> executor but it won't set the mailbox executor. for the operators wrapped by 
> the BatchMultipleInputStreamOperator. Then the exception is thrown.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode

2023-11-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967
 ] 

Shuai Xu edited comment on FLINK-33549 at 11/23/23 3:39 AM:


Hi [~luoyuxia] , I'd like to fix this. Could you assign it to me?


was (Author: JIRAUSER300096):
I'd like to fix this. Could you assign it to me? [~luoyuxia] 

> Exception "Factory does not implement interface YieldingOperatorFactory" 
> thrown in batch mode 
> --
>
> Key: FLINK-33549
> URL: https://issues.apache.org/jira/browse/FLINK-33549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: luoyuxia
>Priority: Major
>
> When run a job in batch, it throws the following exception
> {code:java}
> java.lang.NullPointerException: Factory does not implement interface 
> org.apache.flink.streaming.api.operators.YieldingOperatorFactory
>     at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
>     at 
> org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>     at java.lang.Thread.run(Thread.java:834) {code}
>  
> When I disable multiple-input by setting 
> table.optimizer.multiple-input-enabled = false, it works then. 
> Should be introduced by FLINK-23621.
> [In 
> here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
>  when the operator factory is instanceof YieldingOperatorFactory, it will set 
> mailbox executor. But when it's 
> BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox 
> executor but it won't set the mailbox executor. for the operators wrapped by 
> the BatchMultipleInputStreamOperator. Then the exception is thrown.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) Get minibatch join operator involved

2024-01-25 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Description: Get minibatch join operator involved in which includes both 
plan and operator. Implement minibatch join in E2E.  (was: Get minibatch join 
operator involved which includes both plan and operator)

> Get minibatch join operator involved
> 
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Priority: Major
> Fix For: 1.19.0
>
>
> Get minibatch join operator involved in which includes both plan and 
> operator. Implement minibatch join in E2E.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join

2024-01-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34256:


 Summary: Add a documentation section for minibatch join
 Key: FLINK-34256
 URL: https://issues.apache.org/jira/browse/FLINK-34256
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Shuai Xu


We should add a minibatch join section in Performance Tuning to explain the 
usage and principle of minibatch-join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) End to end implementation of minibatch join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Description: Implement minibatch join in E2E which includes both plan and 
runtime parts.  (was: Get minibatch join operator involved in which includes 
both plan and operator. Implement minibatch join in E2E.)
Summary: End to end implementation of minibatch join  (was: Get 
minibatch join operator involved)

> End to end implementation of minibatch join
> ---
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) Supports mini-batch for streaming regular join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Summary: Supports mini-batch for streaming regular join  (was: End to end 
implementation of minibatch join)

> Supports mini-batch for streaming regular join
> --
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF

2024-02-03 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814016#comment-17814016
 ] 

Shuai Xu commented on FLINK-34300:
--

Hi, [~xuyangzhong]. I'd like to take this verification. 

> Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
> ---
>
> Key: FLINK-34300
> URL: https://issues.apache.org/jira/browse/FLINK-34300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34349:
-
Description: 
Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing: Verify FLINK-34219 Introduce a new join operator to support 
> minibatch
> --
>
> Key: FLINK-34349
> URL: https://issues.apache.org/jira/browse/FLINK-34349
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Minibatch join is ready. Users could improve performance in regular stream 
> join scenarios. 
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24240)] although it is still 
> being reviewed.
> If someone finds some bugs about this feature, you open a Jira linked this 
> one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34304 ]


Shuai Xu deleted comment on FLINK-34304:
--

was (Author: JIRAUSER300096):
Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814015#comment-17814015
 ] 

Shuai Xu commented on FLINK-34304:
--

Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34349:


 Summary: Release Testing: Verify FLINK-34219 Introduce a new join 
operator to support minibatch
 Key: FLINK-34349
 URL: https://issues.apache.org/jira/browse/FLINK-34349
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Shuai Xu
Assignee: Shuai Xu
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814192#comment-17814192
 ] 

Shuai Xu commented on FLINK-34304:
--

This test is opened in another issue FLINK-34349 and this issue would be closed.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668
 ] 

Shuai Xu edited comment on FLINK-34694 at 4/10/24 9:43 AM:
---

Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides, rewrite 
sql for hitting this optimization can also indicate the scenarios in which this 
optimization takes effect.


was (Author: JIRAUSER300096):
Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides this, 
rewrite sql for hitting this optimization can also indicate the scenarios in 
which this optimization takes effect.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835648#comment-17835648
 ] 

Shuai Xu commented on FLINK-34694:
--

Hi [~rovboyko] , your idea looks interesting. Actually I found that this 
optimization does not  reduce the overhead of state access after reading your 
code rather reduces the state to some extent . IMO, the marginal reduction in 
size may not significantly impact the overhead of storage, given that it 
constitutes a small fraction relative to the records held in the state.

BTW, if you plan to pursue this optimization further, could you provide more 
comprehensive benchmark details? The benchmark results of multiple tests and 
overall performance of all queries are convincing.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668
 ] 

Shuai Xu commented on FLINK-34694:
--

Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides this, 
rewrite sql for hitting this optimization can also indicate the scenarios in 
which this optimization takes effect.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839464#comment-17839464
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() 
in GenericRowData. 
Could you please give a rough explanation of your solutions before implementing 
it?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu edited comment on FLINK-35184 at 4/23/24 5:46 AM:
---

[~rovboyko] , absolutely, please feel free to start the implementation.


was (Author: JIRAUSER300096):
Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu commented on FLINK-35184:
--

Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839920#comment-17839920
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , actually it can't be avoid hash collision even if using 
BinaryRowData which can only reduce the probability to some extent. And the 
solution you mentioned works for me.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35230) Split FlinkSqlParserImplTest to reduce the code lines.

2024-04-24 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840623#comment-17840623
 ] 

Shuai Xu commented on FLINK-35230:
--

[~lsy]  I'd like to take this, would you assign it to me?

> Split FlinkSqlParserImplTest to reduce the code lines.
> --
>
> Key: FLINK-35230
> URL: https://issues.apache.org/jira/browse/FLINK-35230
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Feng Jin
>Priority: Major
>
> With the increasing extension of Calcite syntax, the current 
> FlinkSqlParserImplTest has reached nearly 3000 lines of code.
> If it exceeds the current limit, it will result in errors in the code style 
> check.
> {code:java}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yaml08:33:19.679 [ERROR] 
> src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:[1] 
> (sizes) FileLength: File length is 3,166 lines (max allowed is 3,100).
> {code}
> To facilitate future syntax extends, I suggest that we split 
> FlinkSqlParserImplTest and place the same type of syntax in separate Java 
> tests for the convenience of avoiding the continuous growth of the original 
> test class.
> My current idea is: 
> Since *FlinkSqlParserImplTest* currently inherits {*}SqlParserTest{*}, and 
> *SqlParserTest* itself contains many unit tests, for the convenience of 
> future test splits, we should introduce a basic *ParserTestBase* inheriting 
> {*}SqlParserTest{*}, and disable the original related unit tests in 
> {*}SqlParserTest{*}.
> This will facilitate writing relevant unit tests more quickly during 
> subsequent splitting, without the need to repeatedly execute the unit tests 
> inside SqlParserTest.
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59113=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626
 ] 

Shuai Xu commented on FLINK-34380:
--

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626
 ] 

Shuai Xu edited comment on FLINK-34380 at 5/16/24 1:58 AM:
---

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guarantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guaranteed on a node by using 
LinkedHashMap, however, it could not be guaranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.


was (Author: JIRAUSER300096):
Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources

2024-02-22 Thread Shuai Xu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34500 ]


Shuai Xu deleted comment on FLINK-34500:
--

was (Author: JIRAUSER300096):
Hi, Could I take this verification?

> Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL 
> Sources
> -
>
> Key: FLINK-34500
> URL: https://issues.apache.org/jira/browse/FLINK-34500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Parent, Table SQL / API
>Affects Versions: 1.19.0
>Reporter: SuDewei
>Assignee: Yun Tang
>Priority: Blocker
> Fix For: 1.19.0
>
>
> This issue aims to verify 
> [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].
> Volunteers can verify it by following the [doc 
> changes|https://github.com/apache/flink/pull/24234]. Since currently only the 
> pre-defined DataGen connector and user-defined connector supports setting 
> source parallelism, volunteers can verify it through DataGen Connector.
> The basic steps include:
> 1. Start a Flink cluster and submit a Flink SQL Job to the cluster.
> 2. In this Flink Job, use the DataGen SQL Connector to generate data.
> 3. Specify the parameter scan.parallelism in DataGen connector options as 
> user-defined parallelism instead of default parallelism.
> 4. Observe whether the parallelism of the source has changed on the job graph 
> of the Flink Application UI, and whether the shuffle mode is correct.
> If everything is normal, you will see that the parallelism of the source 
> operator is indeed different from that of downstream, and the shuffle mode is 
> rebalanced by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources

2024-02-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819964#comment-17819964
 ] 

Shuai Xu commented on FLINK-34500:
--

Hi, Could I take this verification?

> Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL 
> Sources
> -
>
> Key: FLINK-34500
> URL: https://issues.apache.org/jira/browse/FLINK-34500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Parent, Table SQL / API
>Affects Versions: 1.19.0
>Reporter: SuDewei
>Assignee: Yun Tang
>Priority: Blocker
> Fix For: 1.19.0
>
>
> This issue aims to verify 
> [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].
> Volunteers can verify it by following the [doc 
> changes|https://github.com/apache/flink/pull/24234]. Since currently only the 
> pre-defined DataGen connector and user-defined connector supports setting 
> source parallelism, volunteers can verify it through DataGen Connector.
> The basic steps include:
> 1. Start a Flink cluster and submit a Flink SQL Job to the cluster.
> 2. In this Flink Job, use the DataGen SQL Connector to generate data.
> 3. Specify the parameter scan.parallelism in DataGen connector options as 
> user-defined parallelism instead of default parallelism.
> 4. Observe whether the parallelism of the source has changed on the job graph 
> of the Flink Application UI, and whether the shuffle mode is correct.
> If everything is normal, you will see that the parallelism of the source 
> operator is indeed different from that of downstream, and the shuffle mode is 
> rebalanced by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-29 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822433#comment-17822433
 ] 

Shuai Xu commented on FLINK-34380:
--

Let me take a look.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)