[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626
 ] 

Shuai Xu edited comment on FLINK-34380 at 5/16/24 1:58 AM:
---

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guarantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guaranteed on a node by using 
LinkedHashMap, however, it could not be guaranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.


was (Author: JIRAUSER300096):
Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846626#comment-17846626
 ] 

Shuai Xu commented on FLINK-34380:
--

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35230) Split FlinkSqlParserImplTest to reduce the code lines.

2024-04-24 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840623#comment-17840623
 ] 

Shuai Xu commented on FLINK-35230:
--

[~lsy]  I'd like to take this, would you assign it to me?

> Split FlinkSqlParserImplTest to reduce the code lines.
> --
>
> Key: FLINK-35230
> URL: https://issues.apache.org/jira/browse/FLINK-35230
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Feng Jin
>Priority: Major
>
> With the increasing extension of Calcite syntax, the current 
> FlinkSqlParserImplTest has reached nearly 3000 lines of code.
> If it exceeds the current limit, it will result in errors in the code style 
> check.
> {code:java}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yaml08:33:19.679 [ERROR] 
> src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:[1] 
> (sizes) FileLength: File length is 3,166 lines (max allowed is 3,100).
> {code}
> To facilitate future syntax extends, I suggest that we split 
> FlinkSqlParserImplTest and place the same type of syntax in separate Java 
> tests for the convenience of avoiding the continuous growth of the original 
> test class.
> My current idea is: 
> Since *FlinkSqlParserImplTest* currently inherits {*}SqlParserTest{*}, and 
> *SqlParserTest* itself contains many unit tests, for the convenience of 
> future test splits, we should introduce a basic *ParserTestBase* inheriting 
> {*}SqlParserTest{*}, and disable the original related unit tests in 
> {*}SqlParserTest{*}.
> This will facilitate writing relevant unit tests more quickly during 
> subsequent splitting, without the need to repeatedly execute the unit tests 
> inside SqlParserTest.
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59113=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu commented on FLINK-35184:
--

Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu edited comment on FLINK-35184 at 4/23/24 5:46 AM:
---

[~rovboyko] , absolutely, please feel free to start the implementation.


was (Author: JIRAUSER300096):
Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839920#comment-17839920
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , actually it can't be avoid hash collision even if using 
BinaryRowData which can only reduce the probability to some extent. And the 
solution you mentioned works for me.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839464#comment-17839464
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() 
in GenericRowData. 
Could you please give a rough explanation of your solutions before implementing 
it?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668
 ] 

Shuai Xu edited comment on FLINK-34694 at 4/10/24 9:43 AM:
---

Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides, rewrite 
sql for hitting this optimization can also indicate the scenarios in which this 
optimization takes effect.


was (Author: JIRAUSER300096):
Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides this, 
rewrite sql for hitting this optimization can also indicate the scenarios in 
which this optimization takes effect.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835668#comment-17835668
 ] 

Shuai Xu commented on FLINK-34694:
--

Hi [~rovboyko],

The method `otherRecordHasNoAssociationsInInputSide` in your code would be 
invoked for every associatedRecord. This indeed increases the overhead of state 
access. It is difficult to say which one has a greater proportion between the 
increased costs and the reduced expenses of the method 
'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the 
data distribution itself.

So a detailed test report could better illustrate the problem. And a comparison 
table that covers JOIN keyword in queries of nexmark is good. Besides this, 
rewrite sql for hitting this optimization can also indicate the scenarios in 
which this optimization takes effect.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835648#comment-17835648
 ] 

Shuai Xu commented on FLINK-34694:
--

Hi [~rovboyko] , your idea looks interesting. Actually I found that this 
optimization does not  reduce the overhead of state access after reading your 
code rather reduces the state to some extent . IMO, the marginal reduction in 
size may not significantly impact the overhead of storage, given that it 
constitutes a small fraction relative to the records held in the state.

BTW, if you plan to pursue this optimization further, could you provide more 
comprehensive benchmark details? The benchmark results of multiple tests and 
overall performance of all queries are convincing.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-29 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822433#comment-17822433
 ] 

Shuai Xu commented on FLINK-34380:
--

Let me take a look.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources

2024-02-22 Thread Shuai Xu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34500 ]


Shuai Xu deleted comment on FLINK-34500:
--

was (Author: JIRAUSER300096):
Hi, Could I take this verification?

> Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL 
> Sources
> -
>
> Key: FLINK-34500
> URL: https://issues.apache.org/jira/browse/FLINK-34500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Parent, Table SQL / API
>Affects Versions: 1.19.0
>Reporter: SuDewei
>Assignee: Yun Tang
>Priority: Blocker
> Fix For: 1.19.0
>
>
> This issue aims to verify 
> [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].
> Volunteers can verify it by following the [doc 
> changes|https://github.com/apache/flink/pull/24234]. Since currently only the 
> pre-defined DataGen connector and user-defined connector supports setting 
> source parallelism, volunteers can verify it through DataGen Connector.
> The basic steps include:
> 1. Start a Flink cluster and submit a Flink SQL Job to the cluster.
> 2. In this Flink Job, use the DataGen SQL Connector to generate data.
> 3. Specify the parameter scan.parallelism in DataGen connector options as 
> user-defined parallelism instead of default parallelism.
> 4. Observe whether the parallelism of the source has changed on the job graph 
> of the Flink Application UI, and whether the shuffle mode is correct.
> If everything is normal, you will see that the parallelism of the source 
> operator is indeed different from that of downstream, and the shuffle mode is 
> rebalanced by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources

2024-02-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819964#comment-17819964
 ] 

Shuai Xu commented on FLINK-34500:
--

Hi, Could I take this verification?

> Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL 
> Sources
> -
>
> Key: FLINK-34500
> URL: https://issues.apache.org/jira/browse/FLINK-34500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Parent, Table SQL / API
>Affects Versions: 1.19.0
>Reporter: SuDewei
>Assignee: Yun Tang
>Priority: Blocker
> Fix For: 1.19.0
>
>
> This issue aims to verify 
> [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].
> Volunteers can verify it by following the [doc 
> changes|https://github.com/apache/flink/pull/24234]. Since currently only the 
> pre-defined DataGen connector and user-defined connector supports setting 
> source parallelism, volunteers can verify it through DataGen Connector.
> The basic steps include:
> 1. Start a Flink cluster and submit a Flink SQL Job to the cluster.
> 2. In this Flink Job, use the DataGen SQL Connector to generate data.
> 3. Specify the parameter scan.parallelism in DataGen connector options as 
> user-defined parallelism instead of default parallelism.
> 4. Observe whether the parallelism of the source has changed on the job graph 
> of the Flink Application UI, and whether the shuffle mode is correct.
> If everything is normal, you will see that the parallelism of the source 
> operator is indeed different from that of downstream, and the shuffle mode is 
> rebalanced by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34219) Introduce a new join operator to support minibatch

2024-02-20 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34219:
-
Release Note: Support minibatch regular join to reduce intermediate result 
and resolve record amplification in cascading join scenarios.

> Introduce a new join operator to support minibatch
> --
>
> Key: FLINK-34219
> URL: https://issues.apache.org/jira/browse/FLINK-34219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>    Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
> Fix For: 1.19.0
>
>
> This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818612#comment-17818612
 ] 

Shuai Xu commented on FLINK-34346:
--

Hi, I have finished this testing. The exception I think could be improved has 
been linked to this jira.

> Release Testing: Verify FLINK-24024 Support session Window TVF
> --
>
> Key: FLINK-34346
> URL: https://issues.apache.org/jira/browse/FLINK-34346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Session window TVF is ready. Users can use Session window TVF aggregation 
> instead of using legacy session group window aggregation.
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24250]) although it is still 
> being reviewed. 
> Further more,  although session window join, session window rank and session 
> window deduplicate are in experimental state, If someone finds some bugs 
> about them, you could also open a Jira linked this one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818611#comment-17818611
 ] 

Shuai Xu commented on FLINK-34355:
--

Hi, I have finished this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34462) Session window with negative parameter throws unclear exception

2024-02-19 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34462:


 Summary: Session window with negative parameter throws unclear 
exception
 Key: FLINK-34462
 URL: https://issues.apache.org/jira/browse/FLINK-34462
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Shuai Xu


Set invalid parameter in session window get unclear error.
{code:java}
// add test in WindowAggregateITCase
def testEventTimeSessionWindowWithInvalidName(): Unit = {
  val sql =
"""
  |SELECT
  |  window_start,
  |  window_end,
  |  COUNT(*),
  |  SUM(`bigdec`),
  |  MAX(`double`),
  |  MIN(`float`),
  |  COUNT(DISTINCT `string`),
  |  concat_distinct_agg(`string`)
  |FROM TABLE(
  |   SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '-5' SECOND))
  |GROUP BY window_start, window_end
""".stripMargin

  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
} 

{code}
{code:java}
java.lang.AssertionError: Sql optimization: Assertion error: null at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:128)
 at 
org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:60)
 at 
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase.testEventTimeSessionWindowWithInvalidName(WindowAggregateITCase.scala:1239)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.Iterator.forEachRemaining(Iterator.java:116) at 
scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26)
 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.st

[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-19 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818384#comment-17818384
 ] 

Shuai Xu commented on FLINK-34378:
--

Hi [~xuyangzhong] . This is an expected behavior. To maintain order, additional 
data structures would need to be introduced, which would result in a 
performance degradation and the ordered effect would only materialize when 
parallelism is set to 1. If order preservation is required with a parallelism 
of 1, it suffices to simply turn off the minibatch feature.

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193
 ] 

Shuai Xu edited comment on FLINK-34355 at 2/5/24 7:19 AM:
--

Hi, I'd like to take this verification. cc [~hackergin] .


was (Author: JIRAUSER300096):
Hi, I'd like to take this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814193#comment-17814193
 ] 

Shuai Xu commented on FLINK-34355:
--

Hi, I'd like to take this verification.

> Release Testing: Verify FLINK-34054 Support named parameters for functions 
> and procedures
> -
>
> Key: FLINK-34355
> URL: https://issues.apache.org/jira/browse/FLINK-34355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Test suggestion:
> 1. Implement a test UDF or Procedure and support Named Parameters.
> 2. When calling a function or procedure, use named parameters to verify if 
> the results are as expected.
> You can test the following scenarios:
> 1. Normal usage of named parameters, fully specifying each parameter.
> 2. Omitting unnecessary parameters.
> 3. Omitting necessary parameters to confirm if an error is reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-04 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814192#comment-17814192
 ] 

Shuai Xu commented on FLINK-34304:
--

This test is opened in another issue FLINK-34349 and this issue would be closed.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34304 ]


Shuai Xu deleted comment on FLINK-34304:
--

was (Author: JIRAUSER300096):
Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34349:
-
Description: 
Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing: Verify FLINK-34219 Introduce a new join operator to support 
> minibatch
> --
>
> Key: FLINK-34349
> URL: https://issues.apache.org/jira/browse/FLINK-34349
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Minibatch join is ready. Users could improve performance in regular stream 
> join scenarios. 
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24240)] although it is still 
> being reviewed.
> If someone finds some bugs about this feature, you open a Jira linked this 
> one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34349:


 Summary: Release Testing: Verify FLINK-34219 Introduce a new join 
operator to support minibatch
 Key: FLINK-34349
 URL: https://issues.apache.org/jira/browse/FLINK-34349
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Shuai Xu
Assignee: Shuai Xu
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF

2024-02-03 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814016#comment-17814016
 ] 

Shuai Xu commented on FLINK-34300:
--

Hi, [~xuyangzhong]. I'd like to take this verification. 

> Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
> ---
>
> Key: FLINK-34300
> URL: https://issues.apache.org/jira/browse/FLINK-34300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814015#comment-17814015
 ] 

Shuai Xu commented on FLINK-34304:
--

Minibatch join is ready. Users could improve performance in regular stream join 
scenarios. 

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24240)] although it is still being 
reviewed.

If someone finds some bugs about this feature, you open a Jira linked this one 
to report them.

> Release Testing Instructions: Verify FLINK-34219 Introduce a new join 
> operator to support minibatch
> ---
>
> Key: FLINK-34304
> URL: https://issues.apache.org/jira/browse/FLINK-34304
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) Supports mini-batch for streaming regular join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Summary: Supports mini-batch for streaming regular join  (was: End to end 
implementation of minibatch join)

> Supports mini-batch for streaming regular join
> --
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>    Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) End to end implementation of minibatch join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Description: Implement minibatch join in E2E which includes both plan and 
runtime parts.  (was: Get minibatch join operator involved in which includes 
both plan and operator. Implement minibatch join in E2E.)
Summary: End to end implementation of minibatch join  (was: Get 
minibatch join operator involved)

> End to end implementation of minibatch join
> ---
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>    Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join

2024-01-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34256:


 Summary: Add a documentation section for minibatch join
 Key: FLINK-34256
 URL: https://issues.apache.org/jira/browse/FLINK-34256
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Shuai Xu


We should add a minibatch join section in Performance Tuning to explain the 
usage and principle of minibatch-join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34222) Get minibatch join operator involved

2024-01-25 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Description: Get minibatch join operator involved in which includes both 
plan and operator. Implement minibatch join in E2E.  (was: Get minibatch join 
operator involved which includes both plan and operator)

> Get minibatch join operator involved
> 
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>    Reporter: Shuai Xu
>Priority: Major
> Fix For: 1.19.0
>
>
> Get minibatch join operator involved in which includes both plan and 
> operator. Implement minibatch join in E2E.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34222) Get minibatch join operator involved

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34222:


 Summary: Get minibatch join operator involved
 Key: FLINK-34222
 URL: https://issues.apache.org/jira/browse/FLINK-34222
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Get minibatch join operator involved which includes both plan and operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34221) Introduce operator for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34221:


 Summary: Introduce operator for minibatch join
 Key: FLINK-34221
 URL: https://issues.apache.org/jira/browse/FLINK-34221
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Introduce operator that implements minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34220) introduce buffer bundle for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34220:


 Summary: introduce buffer bundle for minibatch join
 Key: FLINK-34220
 URL: https://issues.apache.org/jira/browse/FLINK-34220
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


introduce buffer bundle for storing records to implement minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34219) Introduce a new join operator to support minibatch

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34219:


 Summary: Introduce a new join operator to support minibatch
 Key: FLINK-34219
 URL: https://issues.apache.org/jira/browse/FLINK-34219
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Summary: JsonObjectAggFunction can't retract previous data which is invalid 
when enabling local global agg  (was: JsonObjectAggFunction can't retract 
previous data which is invalid when enable local global agg)

> JsonObjectAggFunction can't retract previous data which is invalid when 
> enabling local global agg
> -
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Summary: JsonObjectAggFunction can't retract previous data which is invalid 
when enable local global agg  (was: jsonObjectAggFunction can't retract 
previous data which is invalid when enable local global agg)

> JsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Description: 
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.

  was:
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.


> jsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal and minibatch  in 
> sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-
Description: 
Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.

  was:
Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.


> jsonObjectAggFunction can't retract previous data which is invalid when 
> enable local global agg
> ---
>
> Key: FLINK-33689
> URL: https://issues.apache.org/jira/browse/FLINK-33689
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
> s"""
>|select
>|   JSON_OBJECTAGG(key k value v)
>|from (select
>| cast(SUM(a) as string) as k,t as v
>|   from
>| Table6
>|   group by t)
>|""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
> List(
>   "{\"30\":2}"
> )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-33689:


 Summary: jsonObjectAggFunction can't retract previous data which 
is invalid when enable local global agg
 Key: FLINK-33689
 URL: https://issues.apache.org/jira/browse/FLINK-33689
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Shuai Xu


Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode

2023-11-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967
 ] 

Shuai Xu edited comment on FLINK-33549 at 11/23/23 3:39 AM:


Hi [~luoyuxia] , I'd like to fix this. Could you assign it to me?


was (Author: JIRAUSER300096):
I'd like to fix this. Could you assign it to me? [~luoyuxia] 

> Exception "Factory does not implement interface YieldingOperatorFactory" 
> thrown in batch mode 
> --
>
> Key: FLINK-33549
> URL: https://issues.apache.org/jira/browse/FLINK-33549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: luoyuxia
>Priority: Major
>
> When run a job in batch, it throws the following exception
> {code:java}
> java.lang.NullPointerException: Factory does not implement interface 
> org.apache.flink.streaming.api.operators.YieldingOperatorFactory
>     at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
>     at 
> org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>     at java.lang.Thread.run(Thread.java:834) {code}
>  
> When I disable multiple-input by setting 
> table.optimizer.multiple-input-enabled = false, it works then. 
> Should be introduced by FLINK-23621.
> [In 
> here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
>  when the operator factory is instanceof YieldingOperatorFactory, it will set 
> mailbox executor. But when it's 
> BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox 
> executor but it won't set the mailbox executor. for the operators wrapped by 
> the BatchMultipleInputStreamOperator. Then the exception is thrown.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode

2023-11-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788967#comment-17788967
 ] 

Shuai Xu commented on FLINK-33549:
--

I'd like to fix this. Could you assign it to me? [~luoyuxia] 

> Exception "Factory does not implement interface YieldingOperatorFactory" 
> thrown in batch mode 
> --
>
> Key: FLINK-33549
> URL: https://issues.apache.org/jira/browse/FLINK-33549
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: luoyuxia
>Priority: Major
>
> When run a job in batch, it throws the following exception
> {code:java}
> java.lang.NullPointerException: Factory does not implement interface 
> org.apache.flink.streaming.api.operators.YieldingOperatorFactory
>     at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
>     at 
> org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48)
>     at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>     at java.lang.Thread.run(Thread.java:834) {code}
>  
> When I disable multiple-input by setting 
> table.optimizer.multiple-input-enabled = false, it works then. 
> Should be introduced by FLINK-23621.
> [In 
> here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
>  when the operator factory is instanceof YieldingOperatorFactory, it will set 
> mailbox executor. But when it's 
> BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox 
> executor but it won't set the mailbox executor. for the operators wrapped by 
> the BatchMultipleInputStreamOperator. Then the exception is thrown.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA

2023-10-16 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580
 ] 

Shuai Xu commented on FLINK-28650:
--

The bug in partial insert with writable metadata is fixed in FLINK-30922.

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
> Fix For: 1.19.0
>
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28650) Flink SQL Parsing bug for METADATA

2023-10-16 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775580#comment-17775580
 ] 

Shuai Xu edited comment on FLINK-28650 at 10/16/23 7:58 AM:


This bug  is fixed in FLINK-30922.


was (Author: JIRAUSER300096):
The bug in partial insert with writable metadata is fixed in FLINK-30922.

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
> Fix For: 1.19.0
>
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28866) Use DDL instead of legacy method to register the test source in JoinITCase

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757169#comment-17757169
 ] 

Shuai Xu commented on FLINK-28866:
--

Hi, I would like to fix this issue. Could it be assigned to me?

> Use DDL instead of legacy method to register the test source in JoinITCase
> --
>
> Key: FLINK-28866
> URL: https://issues.apache.org/jira/browse/FLINK-28866
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785
 ] 

Shuai Xu edited comment on FLINK-27741 at 8/21/23 9:58 AM:
---

Hi [~chenzihao] , it appears that your pull request has not been approved. 
Would you like to continue working on it? If not, I would be happy to fix it.


was (Author: JIRAUSER300096):
Hi, I would like to fix this issue. Could it be assigned to me, please?

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUti

[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756785#comment-17756785
 ] 

Shuai Xu commented on FLINK-27741:
--

Hi, I would like to fix this issue. Could it be assigned to me, please?

> Fix NPE when use dense_rank() and rank() in over aggregation
> 
>
> Key: FLINK-27741
> URL: https://issues.apache.org/jira/browse/FLINK-27741
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: chenzihao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> There has an 'NullPointException' when use RANK() and DENSE_RANK() in over 
> window.
> {code:java}
> @Test
>   def testDenseRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY 
> proctime) FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> {code:java}
> @Test
>   def testRankOnOver(): Unit = {
> val t = failingDataSource(TestData.tupleData5)
>   .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
> tEnv.registerTable("MyTable", t)
> val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) 
> FROM MyTable"
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
> env.execute()
>   }
> {code}
> Exception Info:
> {code:java}
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248)
>   at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248)
>   at scala.collection.SeqLike.size(SeqLike.scala:104)
>   at scala.collection.SeqLike.size$(SeqLike.scala:104)
>   at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95)
>   at 
> scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95)
>   at 
> scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242)
>   at scala.collection.mutable.Builder.sizeHint(Builder.scala:77)
>   at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76)
>   at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21)
>   at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:232)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454)
>   at 
> org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361)
>   at 
> org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverP

[jira] [Commented] (FLINK-25054) Improve exception message for unsupported hashLength for SHA2 function

2023-08-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756784#comment-17756784
 ] 

Shuai Xu commented on FLINK-25054:
--

Hi, I would like to fix this issue. Could it be assigned to me, please?

> Improve exception message for unsupported hashLength for SHA2 function
> --
>
> Key: FLINK-25054
> URL: https://issues.apache.org/jira/browse/FLINK-25054
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.3
>Reporter: DingGeGe
>Priority: Major
> Attachments: image-2021-11-25-16-59-56-699.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> 【exception sql】
> SELECT
> SHA2(, 128)
> FROM
>  
> 【effect】
> when sql is long , it`s hard to clear where is the problem on this issue
> 【reason】
> build-in function SHA2, hashLength do not support “128”, but I could not 
> understand from
> 【Exception log】
> !image-2021-11-25-16-59-56-699.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32824) Port Calcite's fix for the sql like operator

2023-08-09 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752618#comment-17752618
 ] 

Shuai Xu commented on FLINK-32824:
--

Hi, lincoln. I‘d like to fix this issue, could you assign it to me?

> Port Calcite's fix for the sql like operator
> 
>
> Key: FLINK-32824
> URL: https://issues.apache.org/jira/browse/FLINK-32824
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.17.1
>Reporter: lincoln lee
>Priority: Major
> Fix For: 1.19.0
>
>
> we should port the bugfix of sql like operator 
> https://issues.apache.org/jira/browse/CALCITE-1898
> {code}
> The LIKE operator must match '.' (period) literally, not treat it as a 
> wild-card. Currently it treats it the same as '_'.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Description: 
I compiled a plan for an INSERT statement and executed the plan, but the SQL 
client became unresponsive when executing the EXECUTE PLAN statement. I 
confirmed that the Flink job is running normally by checking the Flink 
dashboard. The only issue is that the SQL client becomes stuck and cannot 
accept new commands. I printed the stack trace of the SQL client process, and 
here is a part of it for reference.
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}

  was:
I compiled a plan for an insert statement and then executed the plan. However, 
the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. 
I have checked the Flink dashboard and confirmed that the job is running 
normally. The only issue is that the SQL client is stuck and cannot accept new 
commands. Here is a part of the stack trace for reference.:
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal

[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Description: 
I compiled a plan for an insert statement and then executed the plan. However, 
the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. 
I have checked the Flink dashboard and confirmed that the job is running 
normally. The only issue is that the SQL client is stuck and cannot accept new 
commands. Here is a part of the stack trace for reference.:
{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}

  was:
I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.fo

[jira] [Updated] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-32219:
-
Affects Version/s: 1.17.1

> sql client would be pending after executing plan of inserting
> -
>
> Key: FLINK-32219
> URL: https://issues.apache.org/jira/browse/FLINK-32219
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>    Reporter: Shuai Xu
>Priority: Major
>
> I compiled plan for insert statement firstly and then I execute the plan. 
> However the sql client is pending after running execute plan statement. Here 
> is the part of stacktrace:
> {code:java}
> "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
> waiting on condition [0x000173e01000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00076e72af20> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
>   at 
> org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
>  Source)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-32219:


 Summary: sql client would be pending after executing plan of 
inserting
 Key: FLINK-32219
 URL: https://issues.apache.org/jira/browse/FLINK-32219
 Project: Flink
  Issue Type: Bug
Reporter: Shuai Xu


I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31956) Extend the CompiledPlan to read from/write to Flink's FileSystem

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717082#comment-17717082
 ] 

Shuai Xu commented on FLINK-31956:
--

Hi Jane, I have taken the FLINK-31952 which is similar to the task. I think it 
could be achieved by modifying the streamplanner and tableEnvironmentImpl.Could 
I take the task?

> Extend the CompiledPlan to read from/write to Flink's FileSystem
> 
>
> Key: FLINK-31956
> URL: https://issues.apache.org/jira/browse/FLINK-31956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
> At present, COMPILE/EXECUTE PLAN FOR '${plan.json}' only supports writing 
> to/reading from a local file without the scheme. We propose to extend the 
> support for Flink's FileSystem.
> {code:sql}
> -- before
> COMPILE PLAN FOR '/tmp/foo/bar.json' 
> EXECUTE PLAN FOR '/tmp/foo/bar.json' 
> -- after
> COMPILE PLAN FOR 'file:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 'hdfs:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 's3:///tmp/foo/bar.json' 
> COMPILE PLAN FOR 'oss:///tmp/foo/bar.json'  
> EXECUTE PLAN FOR 'file:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 'hdfs:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 's3:///tmp/foo/bar.json'
> EXECUTE PLAN FOR 'oss:///tmp/foo/bar.json' {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031
 ] 

Shuai Xu edited comment on FLINK-31952 at 4/27/23 7:31 AM:
---

Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?


was (Author: JIRAUSER300096):
Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?

> Support 'EXPLAIN' statement for CompiledPlan
> 
>
> Key: FLINK-31952
> URL: https://issues.apache.org/jira/browse/FLINK-31952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
>  Support the explain SQL syntax towards serialized CompiledPlan
> {code:sql}
> EXPLAIN [  | PLAN FOR]  
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan

2023-04-27 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717031#comment-17717031
 ] 

Shuai Xu commented on FLINK-31952:
--

Hi Jane, I have some ideas for the task and this is how I consider it. 
Purpose of the task:
Support explain [plan for | ] '/path-to-json.json'
The steps I think could resolve the task :
1. Modify parserImpl.ftl template to add syntax support, which can be verified 
by FlinkSqlParserImplTest
2. Translate the sql to Operation.
3. TableEnv#explainPlan is a ready API, we just got the internal plan from the 
Operation  and pass it to TableEnv#explainPlan.
4. Finally add the test in and SqlOperationConverterTest and 
TableEnvironmentTest for verification.
Could I take the task?

> Support 'EXPLAIN' statement for CompiledPlan
> 
>
> Key: FLINK-31952
> URL: https://issues.apache.org/jira/browse/FLINK-31952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
>  Support the explain SQL syntax towards serialized CompiledPlan
> {code:sql}
> EXPLAIN [  | PLAN FOR]  
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] flink pull request #6360: [FLINK-9884] [runtime] fix slot request may not be...

2018-07-18 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/6360

[FLINK-9884] [runtime] fix slot request may not be removed when it has 
already be assigned in slot manager


## What is the purpose of the change

*(The pull request fix the bug that slot request may not be removed from 
pendingSlotRequests in slot manager when it has been assigned.)*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in SlotManagerTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-9884

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6360


commit 58e24424401d28647e376a9ee32d7b70d9ca2724
Author: shuai-xu 
Date:   2018-07-18T07:54:55Z

[FLINK-9884] [runtime] fix slot request may not be removed when it has 
already be assigned in slot manager

commit 4d53107a2817e0e3def8ed31926a7b4a97251c1c
Author: shuai-xu 
Date:   2018-07-18T07:56:50Z

adjust the import order




---


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-05-08 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@StephanEwen, Good idea, I prefer the first one. As for the second one, the 
pending request may have been fulfilled when task executor is killed. so job 
master can not cancel the pending request. And when job master failover the job 
at the same time with resource manager request a new container, it may ask one 
more container than needed.


---


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-05-03 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@GJL In blink, we solve this problem like this. 
When a container complete, we will first whether the container has 
registered yet, if it has registered before, RM will not request a new 
container, as the job master will ask for it when failover. If it has not 
registered, RM will request a new one.


---


[GitHub] flink pull request #5951: [FLINK-9293] [runtime] SlotPool should check slot ...

2018-05-03 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5951

[FLINK-9293] [runtime] SlotPool should check slot id when accepting a slot 
offer with existing allocation id


## What is the purpose of the change

*This pull request fix that job master will accept multi slot offers with 
same allocation id and make the later slots leak.*

## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Run the SlotPoolTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-9293

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5951


commit 22ae227e2a19ddf15890dcce779536687328d7ac
Author: shuai.xus <shuai.xus@...>
Date:   2018-05-03T09:13:08Z

[FLINK-9293] [runtime] SlotPool should check slot id when accepting a slot 
offer with existing allocation id




---


[GitHub] flink pull request #5693: [FLINK-8938] [runtime] not remove job graph during...

2018-03-13 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5693

[FLINK-8938] [runtime] not remove job graph during job master failover


## What is the purpose of the change

*This pull request fix a bug that when job master failover, it may delete 
the job graph, so the next job master can not recover the job any more.*


## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5693


commit 2fc5ec8795b5a53ce0f653a0e0ad5c1346e826c8
Author: shuai.xus <shuai.xus@...>
Date:   2018-03-14T04:16:55Z

[FLINK-8938] [runtime] not remove job graph during job master failover




---


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2018-01-15 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/5190


---


[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...

2018-01-15 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5190
  
@tillrohrmann , In fact, the original problem I'm concerned with has been 
fix by your commit "[FLINK-8119] [flip6] Wire correct Flip6 components in 
Flip6YarnClusterDescriptor", which set the RestOptions#REST_ADDRESS to the host 
of the container.


---


[GitHub] flink pull request #5297: [FLINK-8434] Take over the running task manager af...

2018-01-15 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5297

[FLINK-8434] Take over the running task manager after yarn app master 
failvoer


## What is the purpose of the change

*This pull request makes the yarn resource manager could take over the 
running container from previous attempt.*

## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8434

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5297


commit 2b05e621f57f1d6885d37c3aa7972e6755bc1a20
Author: shuai.xus <shuai.xus@...>
Date:   2018-01-15T08:54:40Z

[FLINK-8434] Take over the running task manager after yarn app master 
failover




---


[GitHub] flink pull request #2675: [FLINK-4504][dataset api]Support user to decide wh...

2018-01-15 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2675


---


[GitHub] flink pull request #2674: [FLINK-4444][runtime]Add a InputChannel and Result...

2018-01-15 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2674


---


[GitHub] flink issue #5271: [FLINK-8399] [runtime] use independent configurations for...

2018-01-14 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5271
  
@tillrohrmann thank you for reviewing, I have modified it.


---


[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...

2018-01-14 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5190
  
@tillrohrmann @EronWright , this make it more clear, but seems not solve 
the problem completely. Since we need to set RestOptions#ADDRESS to the address 
of a rest server so the client can access the rest server. But we get 0.0.0.0 
from getRestAddress of the rest server if let the rest server bind to 
RestOptions#BIND_ADDRESS with 0.0.0.0 unless we add another method to the rest 
server which can get the advertised address.


---


[GitHub] flink issue #5170: [FLINK-8266] [runtime] add network memory to ResourceProf...

2018-01-14 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5170
  
@tillrohrmann sorry, the conflict is resolved now


---


[GitHub] flink issue #5186: [FLINK-8288] [runtime] register job master rest endpoint ...

2018-01-10 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5186
  
@NicoK , thank you for reviewing, I have adjusted the indent.


---


[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...

2018-01-09 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5271

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager


## What is the purpose of the change

*This pull request separate the timeouts for slot request to task manager, 
slot request to be discarded and task manager to be released in slot manager to 
three different configurations.*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5271


commit f7024439ead5e3848c705659bfe221b8ce50f154
Author: shuai.xus <shuai.xus@...>
Date:   2018-01-10T07:43:20Z

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager




---


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2017-12-24 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5190#discussion_r158627004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) 
throws Exception {
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
+   configuration.setString(RestOptions.REST_ADDRESS, 
commonRpcService.getAddress());
--- End diff --

I think what we want to get from the RestServerEndpoint is its server 
address. One way is to let its bind address to be the real ip of the machine. 
The common rpc address now is the real ip. 


---


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2017-12-20 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5190

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine


## What is the purpose of the change

This pull request set the rest.address to the host of the Dispatcher or 
JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 
or 127.0.0.0:9067.



## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8289

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5190


commit 0956b94a910df071a61aae90cac7fef2b795ed0c
Author: shuai.xus <shuai.xus@...>
Date:   2017-12-20T09:37:10Z

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine




---


[GitHub] flink pull request #5186: [FLINK-8288] [runtime] register job master rest en...

2017-12-19 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5186

[FLINK-8288] [runtime] register job master rest endpoint url to yarn


## What is the purpose of the change

This pull request pass the endpoint url of job master rest server to 
resource manager so it can register the url to YARN or Mesos.

## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5186


commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e
Author: shuai.xus <shuai.xus@...>
Date:   2017-12-20T02:10:35Z

[FLINK-8288] [runtime] register job master rest endpoint url to yarn




---


[GitHub] flink pull request #5170: [FLINK-8266] [runtime] add network memory to Resou...

2017-12-15 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5170

[FLINK-8266] [runtime] add network memory to ResourceProfile for the input 
and output memory of a task


## What is the purpose of the change

This pull request adds a network memory field to ResourceProfile. So job 
master can set the network memory of a task according to the number of input 
channels and output sub partitions.

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change can be verified by running ResourceProfileTest:

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicabled)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8266

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5170


commit 93822bfac6a0794b2b2047e046dcef93c5313185
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-12-15T10:43:27Z

[FLINK-8266] add network memroy to ResourceProfile for the input and output 
memory of a task




---


[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-12-13 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4991#discussion_r156848154
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -52,6 +63,15 @@
/** How many native memory in mb are needed */
private final int nativeMemoryInMB;
 
+   /** A extensible field for user specified resources from {@link 
ResourceSpec}. */
+   private final Map<String, Double> extendedResources = new TreeMap<>();
--- End diff --

HashMap is not ordered, so when comparing two ResourceProfile, the result 
may differ if the extended resource are added in different order. Using TreeMap 
can promise the comparision of two ResourceProfile is always same if the extend 
resources is same.


---


[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...

2017-12-08 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5139

[FLINK-8224] [runtime] shutdown application when job terminated in job mode

## What is the purpose of the change

This current job cluster entrypoint doesn't call resource manage to 
shutdown the application. So resource manger has no change to set the 
application status to the outer resource management system such as YARN/Mesos. 
This may make the YARN still consider the application as running even the job 
is finished.

## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8224

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5139


commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-12-08T10:02:42Z

[FLINK-8224] [runtime] shutdown application when job terminated in job mode




---


[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-12-07 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4991#discussion_r155708446
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -61,16 +80,26 @@
 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 * @param directMemoryInMB The size of the direct memory, in megabytes.
 * @param nativeMemoryInMB The size of the native memory, in megabytes.
+* @param memoryForInputInMB The size of the memory for input, in 
megabytes.
+* @param memoryForOutputInMB The size of the memory for output, in 
megabytes.
 */
public ResourceProfile(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
-   int nativeMemoryInMB) {
+   int nativeMemoryInMB,
+   int memoryForInputInMB,
+   int memoryForOutputInMB,
--- End diff --

I think resource spec contains the resource user need to run their code, 
while resource profile contains the resource for running a task. So resource 
profile should also contain the part of resource used by flink system. We 
divide these part of resource to memoryForInputInMB and memoryForOutputInMB, 
and separate them from heap memory and  direct memory so as to different 
resource managers can choose different strategies. For example, per job 
resource manager need all these resource when allocating a task manager. but 
session manager may not consider the memoryForInputInMB and memoryForOutputInMB 
when assign a slot, as these part is decide when the session cluster is 
created. Do you think it make sense?


---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-12-07 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann , I agree with you that adding a build looks better, I 
changed it according to your comments. Do you think it works now?


---


[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...

2017-11-30 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4988
  
This pr is very helpful for us. Thank you. I have tried it on our cluster 
with yarn job mode,and it works well. My only suggestion is to change the 
default value of rest.address to 0.0.0.0 instead of localhost. Since if the 
rest server start at localhost, it can only be accessed from local machine. But 
if on 0.0.0.0, it can be accessed remotely. 


---


[GitHub] flink pull request #4985: [FLINK-8027] Generalize existing rest handlers to ...

2017-11-28 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4985#discussion_r153708910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple {@link TaskExecutorGateway} implementation for testing purposes.
+ */
+public class TestingTaskExecutorGateway implements TaskExecutorGateway {
+
+   private final String address;
+
+   private final String hostname;
+
+   public TestingTaskExecutorGateway() {
+   this("foobar:1234", "foobar");
+   }
+
+   public TestingTaskExecutorGateway(String address, String hostname) {
+   this.address = Preconditions.checkNotNull(address);
+   this.hostname = Preconditions.checkNotNull(hostname);
+   }
+
+   @Override
+   public CompletableFuture requestSlot(SlotID slotId, JobID 
jobId, AllocationID allocationId, String targetAddress, ResourceManagerId 
resourceManagerId, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) 
{
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
updatePartitions(ExecutionAttemptID executionAttemptID, Iterable 
partitionInfos, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void failPartition(ExecutionAttemptID executionAttemptID) {
+   // noop
+   }
+
+   @Override
+   public CompletableFuture 
triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, 
long checkpointTimestamp, CheckpointOptions checkpointOptions) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, 
long checkpointTimestamp) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture stopTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+   // noop
+   }
+
+   @Override
+   public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) {
+   // noop
+   }
+
+   @Override
+   public void disconnectJobManager(JobID jobId, Exception cause) {
 

[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-11-09 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4991

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Notes:  this pull request contains the #4911 since it depends on it.

## What is the purpose of the change

This pull request makes task extendable with  ResourceSpec( #4911), and add 
a two field for calculating the memory needed for an operator to communicating 
with its upstream and downstream.

## Brief change log

  - *Add a extendedResource field for extendable resources in ResourceSpec*
  - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed 
for an operator to communicating with its upstream and downstream*
  - *Add a fromResourceSpec method for transforming ResourceSpec to 
ResourceProfile*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in ResourceProfileTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7928

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4991


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427

commit d769fe5d0184cd6ac264fd42552d290ae6978fbb
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-11-08T09:10:01Z

make Resource abstract and add GPUResource FPGAResource

commit f897d1fa1742c8186c93bb60abfd8719f156c7da
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-11-08T09:20:22Z

enhance test

commit b8e882b9f39f5588338297ce227e200c6527b84b
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-11-10T02:00:08Z

make create protected

commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-11-10T03:02:21Z

make constructor public

commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-26T09:38:04Z

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should 
contains:
1. The resource for the operators: the resources in ResourceSpec (please 
refer to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two 
parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

commit 6665d570882efa49e35251092385efc8fb6adeb8
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-27T07:43:25Z

modify compare

commit 739564db031febd5bb029f08df3ced1ef539c7e6
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-30T04:01:42Z

add more denotes

commit c39c3597c1094bb258556d8d6dc12e5305903ea8
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-11-10T02:55:26Z

rebase with 7878




---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-08 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, I make the Resource abstract and add GPUResource and 
FPGAResource, so user can only add such defined resources, how about it?


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-08 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149610252
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -61,18 +79,17 @@
/** How many state size in mb are used */
private final int stateSizeInMB;
 
+   private final Map<String, Resource> extendedResources = new 
HashMap<>(1);
--- End diff --

done, and add a test for it.


---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-07 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, There is not a generic way for both Yarn and Mesos, as their 
resource allocation interface are different. I think the YARN/MESOS resource 
manager should handle it in their own way. For example, in YarnResourceManager, 
it can add all extended resources to the yarn Resource.class by call 
setResourceValue(name, value). And then only if YARN support a new resource 
type, user can define it without code changing in flink. 


---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-06 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, Yes, the Resource is a little too generic and prone to 
typos. However, the resource are various and closely related to the 
platform(YARN/MESOS), only a GPUResource and FPGAResource may not satisfy 
user's need. For example, we have at lease two types of FPGA resources in our 
cluster. And it could consider the users who need to specify extended resources 
as advanced users. General users only need to know vcore and memory, which are 
already defined in ResurceSpec. Advanced users should be familiar with not only 
flink but also the resource platform. They should know the resources types 
YARN/MESOS supports. And, If flink resource manager passes all the extended 
resource to YARN/MESOS when starting a container, it need not change when 
adding a new resource type only if YARN/MESOS can recognize it from extended 
resources. There has to be a compromise between extendibility and ease of use. 
I suggest we can add a general GPUResource and FPGAResource for general use 
while st
 ill keeping the Resource for extension. Does this make sense?


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-03 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148718854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
--- End diff --

Why put the ScheduledUnit as a parameter here? I think the interface in 
slot pool should be clean and it should only have resource related parameters, 
should not have schedule related parameters.


---


[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

2017-11-03 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4887#discussion_r148716173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -874,6 +894,13 @@ public void handleError(final Exception exception) {
 */
public abstract boolean stopWorker(ResourceID resourceID);
 
+   /**
+* Cancel the allocation of a resource. If the resource allocation has 
not fulfilled, should cancel it.
+*
+* @param resourceProfile The resource description of the previous 
allocation
+*/
+   public abstract void cancelNewWorker(ResourceProfile resourceProfile);
--- End diff --

I comment it is slot manager.


---


[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

2017-11-03 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4887#discussion_r148715689
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -52,4 +52,6 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   void cancelResourceAllocation(ResourceProfile resourceProfile);
--- End diff --

I comment it is slot manager.


---


[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

2017-11-03 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4887#discussion_r148715651
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID 
allocationId) {
PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
 
if (null != pendingSlotRequest) {
-   cancelPendingSlotRequest(pendingSlotRequest);
+   if (pendingSlotRequest.isAssigned()) {
+   cancelPendingSlotRequest(pendingSlotRequest);
+   }
+   else {
+   
resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile());
--- End diff --

Yes, the SlotManager can decide to release the resource more than needed. 
But in a worst case:
1. Now the MESOS or YARN cluster have not enough resource.
2. A job ask for 100 worker of size A;
3. As there are not enough resource, the job failover, the previous 100 is 
not cancelled, it ask another 100.
4. This repeated several times, the pending requests for worker of size A 
reaches 1.
5. A worker of size B crashed, so the job now only need 100 woker of size A 
and 1 worker of size B. But the YARN or MESOS think the job need 1 A and 1 
B as the request are never cancelled.
6. The MESOS/YARN now have some resources for 110 A, more than 100 A and 1 
B, and it begin to assign resource for the job, but it first try to allocate 
1 containers of size A, and the job still can not be started as it is lack 
of container B. 
7. This may cause the job can not be started even when the cluster resource 
is now enough in a long time.
8. And this did happen in our cluster, as our cluster is busy. So I think 
it's better to keep this protocol, and different resource managers can treat 
this protocol according to their need.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-01 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4937

[FLINK-6434] [runtime] cancel slot allocation if request timed out in 
ProviderAndOwner


## What is the purpose of the change

This pr adds a cancel slot allocation protocol between ProviderAndOwner and 
SlotPool. So that ProviderAndOwner can cancel the slot allocations no longer 
need to avoid slot leaking.

## Brief change log

  - *Let the ProviderAndOwner generate the allocation id before calling 
allocateSlot to slot pool.*
  - *If the allocateSlot call timed out, ProviderAndOwner cancel the 
previos allocation to slot pool.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unittest in SlotPoolRpcTest*
  - *Modify the existing SlotPoolTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-6434

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4937


commit ab3c599d55847451a1194ba55375207267561a71
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-20T09:12:39Z

[FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner

Summary:
This fix flink jira #6434
1. Let the ProviderAndOwner generate the allcation id before calling 
allocateSlot to slot pool.
2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos 
allocation to slot pool.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D323990




---


[GitHub] flink pull request #3398: [FLINK-5856] [FLIP-6] return redundant containers ...

2017-11-01 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/3398


---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-10-27 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
I run the failed test on my machine and it pass, and it seems my changes 
will not influence it.


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-10-27 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4911

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resource type extendible in the ResourceSpec.
Add a extend field for new resources.

## What is the purpose of the change

This pull request adds a extensible filed to the ResourceSpec, so user can 
define variable resources only if supported by their resource manager.

*(for example:)*
user can use 
_text.flatMap().setResource(new ResourceSpce(1, 100, new 
ResourceSpce.Resource("GPU", 0.5)));_
to define their GPU requirement for the operator.

## Verifying this change
This change added tests and can be verified as follows:
  - *Added unit tests ResourceSpecTest to verify.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7878

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4911


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427




---


[GitHub] flink issue #4887: [FLINK-7870] [runtime] Cancel slot allocation to RM when ...

2017-10-24 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4887
  
@tillrohrmann could you help to review this pr? Thank you.


---


[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

2017-10-23 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4887

[FLINK-7870] [runtime] Cancel slot allocation to RM when requestSlot timed 
out in SlotPool


## What is the purpose of the change

This pull request add a cancelSlotRequest rpc protocol between slot pool 
and resource manager. When the pending request timeout in slot pool, it send a 
cancelSlotRequest rpc to resouce manager to canel the previous slot request in 
order not to make the slot request become more and more in resource manager.

## Verifying this change
This change added tests and can be verified as follows:
  - *Added a verify in SlotManagerTest to make sure the cancel logic

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4887


commit daf7fd5745659d94f5a84f669bc90b82b5e69e5e
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-17T09:57:18Z

[FLINK-] slot pool cancel slot request to resource manager if timeout

Summary: slot pool cancel slot request to resource manager if timeout

Test Plan: unit test

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D320749

commit 96f80187bb5ef1c268a62bdaf80151a70a04b002
Author: shuai.xus <shuai@alibaba-inc.com>
Date:   2017-10-19T04:13:01Z

add more contract




---


[GitHub] flink issue #3580: [FLINK-5791] Support an optimal matching based slot manag...

2017-08-08 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/3580
  
@tillrohrmann Could you please help to review this pr? And there is a 
question, the pending requests in slot managers are not in order. So requests 
arrived later may be fulfilled earlier, does this matter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3398: [FLINK-5856] [FLIP-6] return redundant containers to yarn...

2017-07-06 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/3398
  
@tillrohrmann , sorry to reply late. I will modify it according to your 
comments in one week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3539: Flip1: fine gained recovery

2017-07-03 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/3539


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3755: [FLINK-6351] [YARN] Refactoring YarnFlinkApplicationMaste...

2017-05-11 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/3755
  
There will soon be a YarnFlinkAppMaster extends 
AbstractYarnFlinkApplicationMasterRunner for flink yarn cluster mode, so need 
not combine them to one class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...

2017-04-25 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3773#discussion_r113361061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at 
last to RUNNING,
+ */
+public class FailoverRegion {
+
+   private static final AtomicReferenceFieldUpdater<FailoverRegion, 
JobStatus> STATE_UPDATER =
+   
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
+
+   // 

+
+   /** a unique id for debugging */
+   private final AbstractID id = new AbstractID();
+
+   private final ExecutionGraph executionGraph;
+
+   private final List connectedExecutionVertexes;
+
+   /** The executor that executes the recovery action after all vertices 
are in a */
+   private final Executor executor;
+
+   /** Current status of the job execution */
+   private volatile JobStatus state = JobStatus.RUNNING;
+
+
+   public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List connectedExecutions) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executor = checkNotNull(executor);
+   this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+
+   LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
+   }
+
+   public void onExecutionFail(Execution taskExecution, Throwable cause) {
+   // TODO: check if need to failover the preceding region
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   executionGraph.failGlobal(cause);
+   }
+   else {
+   cancel(taskExecution.getGlobalModVersion());
+   }
+   }
+
+   private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
+   while (true) {
+   JobStatus curStatus = this.state;
+   if (curStatus.equals(JobStatus.CANCELLING)) {
+   if (transitionState(curStatus, 
JobStatus.CANCELED)) {
+   reset(globalModVersionOfFailover);

  1   2   >