[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727780#comment-17727780 ] Shuiqiang Chen commented on FLINK-30966: [~luoyuxia]Sorry for the delay, I would continue to work on the issue and update the PR as soon as possible. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715419#comment-17715419 ] Shuiqiang Chen edited comment on FLINK-31848 at 4/23/23 9:39 AM: - Here is a test case: {code:scala} def testRowAndRow(): Unit = { val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow " val data = List( Row.of("Hello", Int.box(1), Int.box(1)), Row.of("Hello", null, Int.box(1)), Row.of("Hello again", Int.box(3), Int.box(2))) implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.INT, Types.INT) val ds = env.fromCollection(data) val t = ds.toTable(tEnv, 'a, 'b, 'c) tEnv.createTemporaryView("MyTableRow", t) val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] val sink = new TestingAppendSink result.addSink(sink) env.execute() val expected = List("FALSE,1,1, TRUE,3,2, null,null,1") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1) But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2) was (Author: csq): Here is a test case: {code:scala} def testRowAndRow(): Unit = { val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow " val data = List( Row.of("Hello", Int.box(1), Int.box(1)), Row.of("Hello", null, Int.box(1)), Row.of("Hello again", Int.box(3), Int.box(2))) implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.INT, Types.INT) val ds = env.fromCollection(data) val t = ds.toTable(tEnv, 'a, 'b, 'c) tEnv.createTemporaryView("MyTableRow", t) val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] val sink = new TestingAppendSink result.addSink(sink) env.execute() val expected = List("List(FALSE,1,1, TRUE,3,2, null,null,1)") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1) But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2) > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715419#comment-17715419 ] Shuiqiang Chen commented on FLINK-31848: Here is a test case: {code:scala} def testRowAndRow(): Unit = { val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow " val data = List( Row.of("Hello", Int.box(1), Int.box(1)), Row.of("Hello", null, Int.box(1)), Row.of("Hello again", Int.box(3), Int.box(2))) implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.INT, Types.INT) val ds = env.fromCollection(data) val t = ds.toTable(tEnv, 'a, 'b, 'c) tEnv.createTemporaryView("MyTableRow", t) val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] val sink = new TestingAppendSink result.addSink(sink) env.execute() val expected = List("List(FALSE,1,1, TRUE,3,2, null,null,1)") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1) But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2) > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715415#comment-17715415 ] Shuiqiang Chen commented on FLINK-31848: Hi [~zju_zsx] take a table MyTable(a INT, b INT) for instance, there are three rows: (1, 1), (null, 1), (3, 3) and for the query select * from MyTable where a < 2 and b < 2 the value of left.nullTerm and left.resultTerm would be: false, true // a is not null, then evaluate the reuslt of a < 2 to be true, need to evaluate right code true, false // a is null, means the result of a < 2 is UNKNOWN, no need to evaluate a < 2 but to evalute b < 2 as right.code false, false // a is not null, then evaluate the result of a < 2 to be false, no need to evaluate right code with if(!left.resultTerm), we assume null < 2 to be false, it would become: left& TRUE && 1 < 2 -> TRUE FALSE && skipped -> FALSE FALSE && skipped -> FALSE the final result of the and operation is: nullTerm resultTerm false false false false false true but with if(!left.nullTerm && !left.resultTerm), it should be: left& TRUE && 1 < 2 -> TRUE UNKNOWN && 1 < 2 -> UNKNWON FALSE && skipped -> FALSE the final result of the and operation is: nullTerm resultTerm false false true false false true Seems after the simplification, the result is not consistent. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848 ] Shuiqiang Chen deleted comment on FLINK-31848: was (Author: csq): Hi [~zju_zsx], take a table MyTable(a INT, b INT) for instance, there are three rows: (1, 1), (null, 2), (3, 3) and for the query select * from MyTable where a < 2 and b < 2 the value of left.nullTerm and left.resultTerm would be: false, true // a is not null, then evaluate the reuslt of a < 2 to be true, need to evaluate right code true, false // a is null, means the result of a < 2 is UNKNOWN, no need to evaluate a < 2 but to evalute b < 2 as right.code false, false // a is not null, then evaluate the result of a < 2 to be false, no need to evaluate right code by simplying to if(!left.resultTerm), we assuem the result of null < 2 to be false, but actually it is UNKNOWN. For the correctness, if(!left.resultTerm) and if(!left.nullTerm && !left.resultTerm) are identical. Since the generated code is not exposed to users and not affecting the correctness, the two implementations are fine for me. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715392#comment-17715392 ] Shuiqiang Chen commented on FLINK-31848: Hi [~zju_zsx], take a table MyTable(a INT, b INT) for instance, there are three rows: (1, 1), (null, 2), (3, 3) and for the query select * from MyTable where a < 2 and b < 2 the value of left.nullTerm and left.resultTerm would be: false, true // a is not null, then evaluate the reuslt of a < 2 to be true, need to evaluate right code true, false // a is null, means the result of a < 2 is UNKNOWN, no need to evaluate a < 2 but to evalute b < 2 as right.code false, false // a is not null, then evaluate the result of a < 2 to be false, no need to evaluate right code by simplying to if(!left.resultTerm), we assuem the result of null < 2 to be false, but actually it is UNKNOWN. For the correctness, if(!left.resultTerm) and if(!left.nullTerm && !left.resultTerm) are identical. Since the generated code is not exposed to users and not affecting the correctness, the two implementations are fine for me. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115 ] Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:32 PM: - [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that the logic can be safely simplified. While with `if (!${left.nullTerm} && !${left.resultTerm})`, it would be a little bit more intuitive that ${left.nullTerm} indicates whether left result is UNKNOWN or not in three value logic. was (Author: csq): [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that the logic can be safely simplified. While with `if (!${left.nullTerm} && !${left.resultTerm})`, it would be a little bit more intuitive that ${left.nullTerm} indicates whether left result is UNKNOWN or not in three value logic. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115 ] Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:31 PM: - [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that the logic can be safely simplified. While with `if (!${left.nullTerm} && !${left.resultTerm})`, it would be a little bit more intuitive that ${left.nullTerm} indicates whether left result is UNKNOWN or not in three value logic. was (Author: csq): [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling _if (!${left.resultTerm})_ . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. While with _if (!${left.nullTerm} && !${left.resultTerm})_ in current code base, it would be a little bit more intuitive that ${left.nullTerm} indicating whether left result is UNKNOWN or not. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115 ] Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:29 PM: - [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling _if (!${left.resultTerm})_ . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. While with _if (!${left.nullTerm} && !${left.resultTerm})_ in current code base, it would be a little bit more intuitive that ${left.nullTerm} indicating whether left result is UNKNOWN or not. was (Author: csq): [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115 ] Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:18 PM: - [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned with a boolean value and will never cause a syntax error by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. was (Author: csq): [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After ran several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned a boolean value and will never cause a syntax error by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115 ] Shuiqiang Chen commented on FLINK-31848: [~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After ran several test cases based on release-1.15 (which I previously analyzed) and the latest 1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has always been assigned a boolean value and will never cause a syntax error by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the logic can be safely simplified. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713985#comment-17713985 ] Shuiqiang Chen commented on FLINK-31848: And without null check, it can be simplified to be if (!${left.resultTerm}) as you mentioned and line 671 in ScalarOperatorGens. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf
[ https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713984#comment-17713984 ] Shuiqiang Chen commented on FLINK-31848: Hi, The code block is under the null check required condition that the value of left.resultTerm might be null, then !${left.resultTerm} would cause syntax error. > And Operator has side effect when operands have udf > --- > > Key: FLINK-31848 > URL: https://issues.apache.org/jira/browse/FLINK-31848 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.2 >Reporter: zju_zsx >Priority: Major > Attachments: image-2023-04-19-14-54-46-458.png > > > > {code:java} > CREATE TABLE kafka_source ( > `content` varchar, > `testid` bigint, > `extra` int > ); > CREATE TABLE console_sink ( > `content` varchar, > `testid` bigint > ) > with ( > 'connector' = 'print' > ); > insert into console_sink > select > content,testid+1 > from kafka_source where testid is not null and testid > 0 and my_udf(testid) > != 0; {code} > my_udf has a constraint that the testid should not be null, but the testid is > not null and testid > 0 does not take effect. > > Im ScalarOperatorGens.generateAnd > !image-2023-04-19-14-54-46-458.png! > if left.nullTerm is true, right code will be execute 。 > it seems that > {code:java} > if (!${left.nullTerm} && !${left.resultTerm}) {code} > can be safely replaced with > {code:java} > if (!${left.resultTerm}){code} > ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31313) Unsupported meta columns in column list of insert statement
[ https://issues.apache.org/jira/browse/FLINK-31313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17696112#comment-17696112 ] Shuiqiang Chen commented on FLINK-31313: Hi [~lincoln.86xy], I think it is the same issue with FLINK-30922. > Unsupported meta columns in column list of insert statement > --- > > Key: FLINK-31313 > URL: https://issues.apache.org/jira/browse/FLINK-31313 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.16.1 >Reporter: lincoln lee >Priority: Major > > Currently an error will be raised when ref meta columns in column list of > insert statement, e.g., > {code} > INSERT INTO sink (a,b,f) -- here `f` is a metadata column of sink table > SELECT ...{code} > {code} > Caused by: org.apache.calcite.runtime.CalciteContextException: At line 1, > column 44: Unknown target column 'f' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:440) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:428) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:169) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:161) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:72) > {code} > The cause is current PreValidateReWriter in validation phase uses the > physical types of sink table which does not include metadata columns -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692146#comment-17692146 ] Shuiqiang Chen commented on FLINK-31124: Never mind, it doesn't block the contribution. > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Tests >Reporter: Biao Liu >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692076#comment-17692076 ] Shuiqiang Chen commented on FLINK-31120: I agree with [~TsReaper] that the variable does not need to be static since it is accessed only by current environment. Or is there any other consideration? > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Shuiqiang Chen >Priority: Blocker > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690754#comment-17690754 ] Shuiqiang Chen commented on FLINK-31124: Thank you [~zhuzh], I have created the PR and will be grateful if you could help review it. BTW, the username of the assignee is not the valid one of mine, my current JIRA id is `csq`. > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Assignee: chenshuiqiang >Priority: Major > Labels: pull-request-available > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523 ] Shuiqiang Chen edited comment on FLINK-31120 at 2/17/23 6:17 PM: - Maybe it needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. was (Author: csq): Maybe tt needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690524#comment-17690524 ] Shuiqiang Chen commented on FLINK-31124: Hi [~zhuzh] I would like to help finish to issue, could you please assign it to me? > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Minor > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523 ] Shuiqiang Chen commented on FLINK-31120: Maybe tt needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687683#comment-17687683 ] Shuiqiang Chen edited comment on FLINK-30966 at 2/13/23 2:51 AM: - [~luoyuxia]The matter of [FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type inferencing, while there is another issue that the code block position of result term casting might be wrong. was (Author: csq): [~luoyuxia]The matter of [FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type inferencing, while there is another issue that the code block of result term casting might be wrong. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687683#comment-17687683 ] Shuiqiang Chen commented on FLINK-30966: [~luoyuxia]The matter of [FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type inferencing, while there is another issue that the code block of result term casting might be wrong. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687682#comment-17687682 ] Shuiqiang Chen commented on FLINK-31003: [~luoyuxia] +1 > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347 ] Shuiqiang Chen edited comment on FLINK-31003 at 2/11/23 2:51 AM: - Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 |https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. was (Author: csq): Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 title|https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
[ https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347 ] Shuiqiang Chen commented on FLINK-31003: Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 title|https://issues.apache.org/jira/browse/FLINK-30966], that when normalizing arguments in IfCallGen, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. > Flink SQL IF / CASE WHEN Funcation incorrect > > > Key: FLINK-31003 > URL: https://issues.apache.org/jira/browse/FLINK-31003 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1 >Reporter: weiqinpan >Priority: Major > > When I execute the below sql using sql-client,i found something wrong. > > {code:java} > CREATE TEMPORARY TABLE source ( > mktgmsg_biz_type STRING, > marketing_flow_id STRING, > mktgmsg_campaign_id STRING > ) > WITH > ( > 'connector' = 'filesystem', > 'path' = 'file:///Users/xxx/Desktop/demo.json', > 'format' = 'json' > ); > -- return correct value('marketing_flow_id') > SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM > source; > -- return incorrect value('') > SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM > source;{code} > The demo.json data is > > {code:java} > {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": > "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} > > > BTW, use case when + if / ifnull also have something wrong. > > {code:java} > -- return wrong value(''), expect return marketing_flow_id > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` > IS NULL, `marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return wrong value('') > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE '' > END AS `message_campaign_instance_id` FROM source; > -- return correct value, the difference is [else return ' '] > select CASE > WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN > IFNULL(`marketing_flow_id`, '') > WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN > IFNULL(`mktgmsg_campaign_id`, '') > ELSE ' ' > END AS `message_campaign_instance_id` FROM source; > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686843#comment-17686843 ] Shuiqiang Chen commented on FLINK-30966: BTW, the expected result of your query might be: +I[succeed, sent, u, 2023-02-08] > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842 ] Shuiqiang Chen edited comment on FLINK-30966 at 2/10/23 2:39 AM: - Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen. After investigated the generated code, there are two problems: 1. It perform the result term casting before the calculation logic, and finally the actual result always refer to a non-initialized field. 2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. I would like to help fix it. was (Author: csq): Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen. There are two problems. 1. It do the result term casting before the calculation logic, and finally the actual result always refer to a non-initialized field. 2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. I would like to help fix it. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842 ] Shuiqiang Chen edited comment on FLINK-30966 at 2/10/23 2:38 AM: - Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen. There are two problems. 1. It do the result term casting before the calculation logic, and finally the actual result always refer to a non-initialized field. 2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. I would like to help fix it. was (Author: csq): Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCodeGen. There are two problems. 1. It do the result term casting before the calculation logic, and finally the actual result always refer to a non-initialized field. 2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. I would like to help fix it. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error
[ https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842 ] Shuiqiang Chen commented on FLINK-30966: Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCodeGen. There are two problems. 1. It do the result term casting before the calculation logic, and finally the actual result always refer to a non-initialized field. 2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 2, 'true', 'false') the result will be string 'fals' which length is the same as 'true'. I would like to help fix it. > Flink SQL IF FUNCTION logic error > - > > Key: FLINK-30966 > URL: https://issues.apache.org/jira/browse/FLINK-30966 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.16.1 >Reporter: 谢波 >Priority: Major > > my data is > {code:java} > // > { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": > "u", "ts_ms": 1671926400225, "transaction": null } {code} > my sql is > > {code:java} > CREATE TABLE t > ( > before ROW ( > status varchar (32) > ), > after ROW ( > status varchar (32) > ), > ts_msbigint, > op string, > kafka_timestamp timestamp METADATA FROM 'timestamp', > -- @formatter:off > proctime AS PROCTIME() > -- @formatter:on > ) WITH ( > 'connector' = 'kafka', > -- 'topic' = '', > 'topic' = 'test', > 'properties.bootstrap.servers' = ' ', > 'properties.group.id' = '', > 'format' = 'json', > 'scan.topic-partition-discovery.interval' = '60s', > 'scan.startup.mode' = 'earliest-offset', > 'json.ignore-parse-errors' = 'true' > ); > create table p > ( > status STRING , > before_status STRING , > after_status STRING , > metadata_operation STRING COMMENT '源记录操作类型', > dt STRING > )WITH ( > 'connector' = 'print' > ); > INSERT INTO p > SELECT >IF(op <> 'd', after.status, before.status), > before.status, > after.status, >op AS metadata_operation, >DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt > FROM t; > {code} > my local env output is > > > {code:java} > +I[null, sent, succeed, u, 2023-02-08] {code} > > my produtionc env output is > {code:java} > +I[sent, sent, succeed, u, 2023-02-08] {code} > why? > This look like a bug. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata
[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686155#comment-17686155 ] Shuiqiang Chen commented on FLINK-30922: I have created a pull request to fixed the issue, Anyone who help review the PR will be highly appreciated. > SQL validate fail in parsing writable metadata > -- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > ~[?:1.8.0_41] > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) >
[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata
[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685762#comment-17685762 ] Shuiqiang Chen commented on FLINK-30922: Hi [~tanjialiang], thank you for reporting the issue. I have reproduced the same error with the code you provided. According to [FLIP-107|https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors], it is possible to write metadata columns in SQL. The cause of this error is a bug that it excludes all computed columns and metadata columns when doing appendPartitionAndNullsProjects in PreValidateReWriter. Actually, it is expected to include all persisted columns. I would like to fix it. > SQL validate fail in parsing writable metadata > -- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at >
[jira] [Commented] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
[ https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682985#comment-17682985 ] Shuiqiang Chen commented on FLINK-30817: [~luoyuxia] I would like to help fix the issue, please assign the jira ticket to me. > ClassCastException in > TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown > --- > > Key: FLINK-30817 > URL: https://issues.apache.org/jira/browse/FLINK-30817 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.17.0 >Reporter: Shuiqiang Chen >Priority: Minor > > When applying partitions in > TestValuesScanTableSourceWithoutProjectionPushDown with no partition > provided, the following code will cause ClassCastException > {code:java} > remainingPartitions = (List>) Collections.emptyMap(); > this.data.put(Collections.emptyMap(), Collections.emptyList()); > {code} > {code:java} > java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast > to java.util.List > at > org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) > at > org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) > at > org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
[ https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-30817: --- Description: When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown with no partition provided, the following code will cause ClassCastException {code:java} remainingPartitions = (List>) Collections.emptyMap(); this.data.put(Collections.emptyMap(), Collections.emptyList()); {code} {code:java} java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to java.util.List at org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) at org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) {code} was: When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown with no partition provided, the following code will cause ClassCastException {code:java} remainingPartitions = (List>) Collections.emptyMap(); this.data.put(Collections.emptyMap(), Collections.emptyList()); {code} {panel:title=My title} java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to java.util.List at org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) at org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) {panel} > ClassCastException in > TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown > --- > > Key: FLINK-30817 > URL: https://issues.apache.org/jira/browse/FLINK-30817 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.17.0 >Reporter: Shuiqiang Chen >Priority: Minor > > When applying partitions in > TestValuesScanTableSourceWithoutProjectionPushDown with no partition > provided, the following code will cause ClassCastException > {code:java} > remainingPartitions = (List>) Collections.emptyMap(); > this.data.put(Collections.emptyMap(), Collections.emptyList()); > {code} > {code:java} > java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast > to java.util.List > at > org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) > at > org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) > at > org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
[ https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-30817: --- Priority: Minor (was: Major) > ClassCastException in > TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown > --- > > Key: FLINK-30817 > URL: https://issues.apache.org/jira/browse/FLINK-30817 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.17.0 >Reporter: Shuiqiang Chen >Priority: Minor > > When applying partitions in > TestValuesScanTableSourceWithoutProjectionPushDown with no partition > provided, the following code will cause ClassCastException > {code:java} > remainingPartitions = (List>) Collections.emptyMap(); > this.data.put(Collections.emptyMap(), Collections.emptyList()); > {code} > {panel:title=My title} > java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast > to java.util.List > at > org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) > at > org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) > at > org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) > {panel} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
Shuiqiang Chen created FLINK-30817: -- Summary: ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown Key: FLINK-30817 URL: https://issues.apache.org/jira/browse/FLINK-30817 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0, 1.17.0 Reporter: Shuiqiang Chen When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown with no partition provided, the following code will cause ClassCastException {code:java} remainingPartitions = (List>) Collections.emptyMap(); this.data.put(Collections.emptyMap(), Collections.emptyList()); {code} {panel:title=My title} java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to java.util.List at org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222) at org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57) at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343) {panel} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28882) ENCODE return error
[ https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556 ] Shuiqiang Chen edited comment on FLINK-28882 at 9/6/22 3:00 AM: I have encountered the same problem in latest master branch (1.17-SNAPSHOT). !screenshot-1.png! We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to [FLINK-24419 |https://issues.apache.org/jira/browse/FLINK-24419] {code:java} /* Example generated code for BINARY(2): // legacy behavior ((byte[])(inputValue)) // new behavior byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : (java.util.Arrays.copyOf(((byte[])(inputValue)), 2))) */ {code} The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the generation of CAST call. There are two optional solutions: 1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in TableConfig when it is ENCODE call; 2. Make the output type of `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`. was (Author: csq): I have encountered the same problem in latest master branch (1.17-SNAPSHOT). !screenshot-1.png! We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to [FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419] {code:java} /* Example generated code for BINARY(2): // legacy behavior ((byte[])(inputValue)) // new behavior byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : (java.util.Arrays.copyOf(((byte[])(inputValue)), 2))) */ {code} The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the generation of CAST call. There are two optional solutions: 1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in TableConfig when it is ENCODE call; 2. Make the output type of `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`. > ENCODE return error > --- > > Key: FLINK-28882 > URL: https://issues.apache.org/jira/browse/FLINK-28882 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: Luning Wang >Assignee: Shuiqiang Chen >Priority: Major > Attachments: screenshot-1.png > > > Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but > it returns 'kyuubi' in the 1.14 version. > {code:java} > select encode('kyuubi', 'UTF-8') {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28882) ENCODE return error
[ https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556 ] Shuiqiang Chen edited comment on FLINK-28882 at 9/6/22 2:56 AM: I have encountered the same problem in latest master branch (1.17-SNAPSHOT). !screenshot-1.png! We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to [FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419] {code:java} /* Example generated code for BINARY(2): // legacy behavior ((byte[])(inputValue)) // new behavior byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : (java.util.Arrays.copyOf(((byte[])(inputValue)), 2))) */ {code} The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the generation of CAST call. There are two optional solutions: 1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in TableConfig when it is ENCODE call; 2. Make the output type of `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`. was (Author: csq): I have encountered the same problem in latest master branch (1.17-SNAPSHOT). !screenshot-1.png! We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to [FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419] {code:java} /* Example generated code for BINARY(2): // legacy behavior ((byte[])(inputValue)) // new behavior byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : (java.util.Arrays.copyOf(((byte[])(inputValue)), 2))) */ {code} The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the generation of cast call. There are two optional solutions: 1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLE in TableConfig when it is ENCODE call; 2. Make the output type of `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`. > ENCODE return error > --- > > Key: FLINK-28882 > URL: https://issues.apache.org/jira/browse/FLINK-28882 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: Luning Wang >Priority: Major > Attachments: screenshot-1.png > > > Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but > it returns 'kyuubi' in the 1.14 version. > {code:java} > select encode('kyuubi', 'UTF-8') {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28882) ENCODE return error
[ https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556 ] Shuiqiang Chen commented on FLINK-28882: I have encountered the same problem in latest master branch (1.17-SNAPSHOT). !screenshot-1.png! We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to [FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419] {code:java} /* Example generated code for BINARY(2): // legacy behavior ((byte[])(inputValue)) // new behavior byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : (java.util.Arrays.copyOf(((byte[])(inputValue)), 2))) */ {code} The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the generation of cast call. There are two optional solutions: 1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLE in TableConfig when it is ENCODE call; 2. Make the output type of `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`. > ENCODE return error > --- > > Key: FLINK-28882 > URL: https://issues.apache.org/jira/browse/FLINK-28882 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: Luning Wang >Priority: Major > Attachments: screenshot-1.png > > > Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but > it returns 'kyuubi' in the 1.14 version. > {code:java} > select encode('kyuubi', 'UTF-8') {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28882) ENCODE return error
[ https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-28882: --- Attachment: screenshot-1.png > ENCODE return error > --- > > Key: FLINK-28882 > URL: https://issues.apache.org/jira/browse/FLINK-28882 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.15.0 >Reporter: Luning Wang >Priority: Major > Attachments: screenshot-1.png > > > Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but > it returns 'kyuubi' in the 1.14 version. > {code:java} > select encode('kyuubi', 'UTF-8') {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600199#comment-17600199 ] Shuiqiang Chen commented on FLINK-28988: Hi, [~jark], could you please help evaluate the solution and review the pr? Thanks! > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I|
[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135 ] Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:51 AM: TLDR: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135 ] Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:29 AM: TLDR: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135 ] Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:28 AM: TLTD: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135 ] Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:20 AM: TLTD: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135 ] Shuiqiang Chen commented on FLINK-28988: TLTD: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01
[jira] [Updated] (FLINK-29146) User set job configuration can not be retirieved from JobGraph and ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-29146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-29146: --- Description: Currently, when building an ExecutionGraph, it requires to set the job specific information (like job id, job name, job configuration, etc) and most of them are from JobGraph.But I find that the configuration in JobGraph is a new Configuration instance when the JobGraph is built, and it does not contain any user set configuration. As a result, we are not able retrieve the use specified job configuration in ExecutionGraph built from JobGraph during execution runtime. BTW, in StreamExecutionEnvironment, it seems that job configurations that not contained in built-in options will be ignored when calling StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, it will be included when constructing a StreamExecutionEnvironment, which seems a bit inconsistent. Is it by design? {code:java} Configuration configuration = new Configuration(); // These configured string will take effect. configuration.setString("k1", "v1"); configuration.setString("k2", "v2"); configuration.setString("k3", "v3"); configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // These configured string will be ignored. configuration.setString("k4", "v4"); configuration.setString("k5", "v5"); configuration.setString("k6", "v6"); env.configure(configuration); {code} was: Currently, when building an ExecutionGraph, it requires to set the job specific information (like job id, job name, job configuration, etc) and most of them are from JobGraph.But I find that the configuraiton in JobGraph is a new Configuration instance that does not contain any user set configuration. As a result, we are not able retrieve the use specified job configuration in ExecutionGraph built from JobGraph during runtime execution. BTW, in StreamExecutionEnvironment, it seems that job configuraitons that not contained in built-in options will be igored when calling StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, it will be included when constructing a StreamExecutionEnvironment, which seems a bit inconsistent. {code:java} Configuration configuration = new Configuration(); // These configured string will take effect. configuration.setString("k1", "v1"); configuration.setString("k2", "v2"); configuration.setString("k3", "v3"); configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // These configured string will be ignored. configuration.setString("k4", "v4"); configuration.setString("k5", "v5"); configuration.setString("k6", "v6"); env.configure(configuration); {code} > User set job configuration can not be retirieved from JobGraph and > ExecutionGraph > - > > Key: FLINK-29146 > URL: https://issues.apache.org/jira/browse/FLINK-29146 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Shuiqiang Chen >Priority: Major > > Currently, when building an ExecutionGraph, it requires to set the job > specific information (like job id, job name, job configuration, etc) and most > of them are from JobGraph.But I find that the configuration in JobGraph is a > new Configuration instance when the JobGraph is built, and it does not > contain any user set configuration. As a result, we are not able retrieve the > use specified job configuration in ExecutionGraph built from JobGraph during > execution runtime. > BTW, in StreamExecutionEnvironment, it seems that job configurations that not > contained in built-in options will be ignored when calling > StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, > it will be included when constructing a StreamExecutionEnvironment, which > seems a bit inconsistent. Is it by design? > {code:java} > Configuration configuration = new Configuration(); > // These configured string will take effect. > configuration.setString("k1", "v1"); > configuration.setString("k2", "v2"); > configuration.setString("k3", "v3"); > configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > // These configured string will be ignored. > configuration.setString("k4", "v4"); > configuration.setString("k5", "v5"); > configuration.setString("k6", "v6"); > env.configure(configuration); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29146) User set job configuration can not be retirieved from JobGraph and ExecutionGraph
Shuiqiang Chen created FLINK-29146: -- Summary: User set job configuration can not be retirieved from JobGraph and ExecutionGraph Key: FLINK-29146 URL: https://issues.apache.org/jira/browse/FLINK-29146 Project: Flink Issue Type: Bug Reporter: Shuiqiang Chen Currently, when building an ExecutionGraph, it requires to set the job specific information (like job id, job name, job configuration, etc) and most of them are from JobGraph.But I find that the configuraiton in JobGraph is a new Configuration instance that does not contain any user set configuration. As a result, we are not able retrieve the use specified job configuration in ExecutionGraph built from JobGraph during runtime execution. BTW, in StreamExecutionEnvironment, it seems that job configuraitons that not contained in built-in options will be igored when calling StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, it will be included when constructing a StreamExecutionEnvironment, which seems a bit inconsistent. {code:java} Configuration configuration = new Configuration(); // These configured string will take effect. configuration.setString("k1", "v1"); configuration.setString("k2", "v2"); configuration.setString("k3", "v3"); configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // These configured string will be ignored. configuration.setString("k4", "v4"); configuration.setString("k5", "v5"); configuration.setString("k6", "v6"); env.configure(configuration); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597255#comment-17597255 ] Shuiqiang Chen commented on FLINK-28988: [~xuannan] Thank you for reporting the issue. I have also reproduced the output with the code snippet you provided in the latest master branch. I would like to have a deep dive into the issue. > Incorrect result for filter after temporal join > --- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Priority: Major > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020|
[jira] [Updated] (FLINK-28119) Update the document for Contribute Documentation
[ https://issues.apache.org/jira/browse/FLINK-28119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-28119: --- Description: Following the [Contribute Documentation|https://flink.apache.org/contributing/contribute-documentation.html], it requires me to install Hugo when executing {code:sh} cd docs ./build_docs.sh -p {code} . And the web server is actually accessed by http://localhost:1313 Maybe we should: 1. List the prerequisites before building the doc. 2. Correct the web server address. was: Following the [Contribute Documentation|https://flink.apache.org/contributing/contribute-documentation.html], it requires me to install Hugo when executing {code:shell} cd docs ./build_docs.sh -p {code} . And the web server is actually accessed by http://localhost:1313 Maybe we should: 1. List the prerequisites before building the doc. 2. Correct the web server address. > Update the document for Contribute Documentation > > > Key: FLINK-28119 > URL: https://issues.apache.org/jira/browse/FLINK-28119 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.16.0 >Reporter: Shuiqiang Chen >Priority: Minor > > Following the [Contribute > Documentation|https://flink.apache.org/contributing/contribute-documentation.html], > it requires me to install Hugo when executing > {code:sh} > cd docs > ./build_docs.sh -p > {code} > . > And the web server is actually accessed by http://localhost:1313 > Maybe we should: > 1. List the prerequisites before building the doc. > 2. Correct the web server address. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28119) Update the document for Contribute Documentation
Shuiqiang Chen created FLINK-28119: -- Summary: Update the document for Contribute Documentation Key: FLINK-28119 URL: https://issues.apache.org/jira/browse/FLINK-28119 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.16.0 Reporter: Shuiqiang Chen Following the [Contribute Documentation|https://flink.apache.org/contributing/contribute-documentation.html], it requires me to install Hugo when executing {code:shell} cd docs ./build_docs.sh -p {code} . And the web server is actually accessed by http://localhost:1313 Maybe we should: 1. List the prerequisites before building the doc. 2. Correct the web server address. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API
[ https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554388#comment-17554388 ] Shuiqiang Chen commented on FLINK-27159: [~dianfu]Yeah, I would like to make the builtin functions in Table API aligned with SQL to improve the user experience. > Support first_value/last_value in the Table API > --- > > Key: FLINK-27159 > URL: https://issues.apache.org/jira/browse/FLINK-27159 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Huang Xingbo >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > > Currently, first_value/last_value are not supported in Table API. > {code:python} > table.group_by(col("a")) >.select( > col("a"), > call("FIRST_VALUE", col("b"))) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API
[ https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554076#comment-17554076 ] Shuiqiang Chen commented on FLINK-27159: [~dianfu] Thank you for your attention. I have created a PR for this ticket, would you mind helping to review it? > Support first_value/last_value in the Table API > --- > > Key: FLINK-27159 > URL: https://issues.apache.org/jira/browse/FLINK-27159 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Huang Xingbo >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > > Currently, first_value/last_value are not supported in Table API. > {code:python} > table.group_by(col("a")) >.select( > col("a"), > call("FIRST_VALUE", col("b"))) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27881) The key(String) in PulsarMessageBuilder returns null
Shuiqiang Chen created FLINK-27881: -- Summary: The key(String) in PulsarMessageBuilder returns null Key: FLINK-27881 URL: https://issues.apache.org/jira/browse/FLINK-27881 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.0 Reporter: Shuiqiang Chen The PulsarMessageBuild.key(String) always return null, which might cause NPE in later call. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API
[ https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524188#comment-17524188 ] Shuiqiang Chen commented on FLINK-27159: Hi [~hxbks2ks], I'm interested in this ticket, could you please assign it to me? > Support first_value/last_value in the Table API > --- > > Key: FLINK-27159 > URL: https://issues.apache.org/jira/browse/FLINK-27159 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Huang Xingbo >Priority: Major > > Currently, first_value/last_value are not supported in Table API. > {code:python} > table.group_by(col("a")) >.select( > col("a"), > call("FIRST_VALUE", col("b"))) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25231) Update PyFlink to use the new type system
[ https://issues.apache.org/jira/browse/FLINK-25231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492438#comment-17492438 ] Shuiqiang Chen commented on FLINK-25231: [~dianfu] Sorry for the late reply. This issue is currently in an almost done state in my local developing branch. But there are some test case failures due to the unexpected converted TypeInfo When applying the TypeConversion.fromDataTypeToLegacyInfo(), I'm trying to find out a solution to prevent the error as soon as I can. > Update PyFlink to use the new type system > - > > Key: FLINK-25231 > URL: https://issues.apache.org/jira/browse/FLINK-25231 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Shuiqiang Chen >Priority: Major > Fix For: 1.15.0 > > > Currently, there are a lot of places in PyFlink Table API still using the > legacy type system. We need to revisit this and migrate them to the new type > system(DataType/LogicalType). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25231) Update PyFlink to use the new type system
[ https://issues.apache.org/jira/browse/FLINK-25231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459611#comment-17459611 ] Shuiqiang Chen commented on FLINK-25231: [~dianfu] Thank you for reporting this improvement. I would like to take on this task, please assign it to me. > Update PyFlink to use the new type system > - > > Key: FLINK-25231 > URL: https://issues.apache.org/jira/browse/FLINK-25231 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Priority: Major > Fix For: 1.15.0 > > > Currently, there are a lot of places in PyFlink Table API still using the > legacy type system. We need to revisit this and migrate them to the new type > system(DataType/LogicalType). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-21966) Support Kinesis connector in Python DataStream API.
Shuiqiang Chen created FLINK-21966: -- Summary: Support Kinesis connector in Python DataStream API. Key: FLINK-21966 URL: https://issues.apache.org/jira/browse/FLINK-21966 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21559) Python DataStreamTests::test_process_function failed on AZP
[ https://issues.apache.org/jira/browse/FLINK-21559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17295025#comment-17295025 ] Shuiqiang Chen commented on FLINK-21559: Thank you for reporting the issue, I am looking at it. > Python DataStreamTests::test_process_function failed on AZP > --- > > Key: FLINK-21559 > URL: https://issues.apache.org/jira/browse/FLINK-21559 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.13.0 > > > The Python test case {{DataStreamTests::test_process_function}} failed on AZP. > {code} > === short test summary info > > FAILED > pyflink/datastream/tests/test_data_stream.py::DataStreamTests::test_process_function > = 1 failed, 705 passed, 22 skipped, 303 warnings in 583.39s (0:09:43) > == > ERROR: InvocationError for command /__w/3/s/flink-python/.tox/py38/bin/pytest > --durations=20 (exited with code 1) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13992=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21113) Add State access API for KeyedStream RuntimeContext.
Shuiqiang Chen created FLINK-21113: -- Summary: Add State access API for KeyedStream RuntimeContext. Key: FLINK-21113 URL: https://issues.apache.org/jira/browse/FLINK-21113 Project: Flink Issue Type: Sub-task Reporter: Shuiqiang Chen Add state access API for KeyedStream RuntimeContext so that users can get a ValueState/ListState/MapState via RuntimeContext. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21111) Support State access in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-2: --- Affects Version/s: (was: 1.13.0) > Support State access in Python DataStream API > - > > Key: FLINK-2 > URL: https://issues.apache.org/jira/browse/FLINK-2 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.13.0 > > > This is the umbrella Jira for FLIP-153, which intends to support state access > in Python DataSteam API. > FLIP wiki page: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API > In FLIP-130, we have already supported Python DataStream stateless API so > that users are able to perform some basic data transformations. To implement > more complex data processing, we need to provide state access support. In > this doc, I would propose to add support of state access for Python > DataStream API to support stateful operations on a KeyedStream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21111) Support State access in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-2: --- Fix Version/s: 1.13.0 > Support State access in Python DataStream API > - > > Key: FLINK-2 > URL: https://issues.apache.org/jira/browse/FLINK-2 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.13.0 > > > This is the umbrella Jira for FLIP-153, which intends to support state access > in Python DataSteam API. > FLIP wiki page: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API > In FLIP-130, we have already supported Python DataStream stateless API so > that users are able to perform some basic data transformations. To implement > more complex data processing, we need to provide state access support. In > this doc, I would propose to add support of state access for Python > DataStream API to support stateful operations on a KeyedStream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21112) Add ValueState/ListState/MapState and corresponding StateDescriptors for Python DataStream API
Shuiqiang Chen created FLINK-21112: -- Summary: Add ValueState/ListState/MapState and corresponding StateDescriptors for Python DataStream API Key: FLINK-21112 URL: https://issues.apache.org/jira/browse/FLINK-21112 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21111) Support State access in Python DataStream API
Shuiqiang Chen created FLINK-2: -- Summary: Support State access in Python DataStream API Key: FLINK-2 URL: https://issues.apache.org/jira/browse/FLINK-2 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.13.0 Reporter: Shuiqiang Chen This is the umbrella Jira for FLIP-153, which intends to support state access in Python DataSteam API. FLIP wiki page: https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API In FLIP-130, we have already supported Python DataStream stateless API so that users are able to perform some basic data transformations. To implement more complex data processing, we need to provide state access support. In this doc, I would propose to add support of state access for Python DataStream API to support stateful operations on a KeyedStream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20720) Add documentation for ProcessFunction in Python DataStream API
Shuiqiang Chen created FLINK-20720: -- Summary: Add documentation for ProcessFunction in Python DataStream API Key: FLINK-20720 URL: https://issues.apache.org/jira/browse/FLINK-20720 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Shuiqiang Chen Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20715) pyflink DataStream filter error.
[ https://issues.apache.org/jira/browse/FLINK-20715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253388#comment-17253388 ] Shuiqiang Chen commented on FLINK-20715: Hi [~kingreatwill], thank you for reporting the issue, is it the full stack trace of the error above? I have tested the code snippet you gave and it finished successfully. > pyflink DataStream filter error. > > > Key: FLINK-20715 > URL: https://issues.apache.org/jira/browse/FLINK-20715 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python >Affects Versions: 1.12.0 > Environment: Flink 1.12 >Reporter: Enter >Priority: Major > > ``` > class MyFilterFunction(FilterFunction): > def filter(self, value): > return value[0] % 2 == 0 > def demo_stream(): > see = StreamExecutionEnvironment.get_execution_environment() > see.set_parallelism(1) > ds = see.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')], > type_info=Types.ROW( > [Types.INT(), Types.STRING(), Types.STRING()]) > ) > ds.filter(MyFilterFunction()).print() > ds.print() > # 执行任务; > see.execute('job1') > if __name__ == '__main__': > demo_stream() > ``` > > raise Py4JError( raise Py4JError(py4j.protocol.Py4JError: An error > occurred while calling o0.__getstate__. > Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Method > __getstate__([]) does not exist at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20647) Use yield to generate output datas in ProcessFunction for Python DataStream
Shuiqiang Chen created FLINK-20647: -- Summary: Use yield to generate output datas in ProcessFunction for Python DataStream Key: FLINK-20647 URL: https://issues.apache.org/jira/browse/FLINK-20647 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.13.0 Currently, users need to use a `collector` to collect the output of their ProcessFunction. We should use yield to make it more efficient that avoid using an array buffer to store the collected data, and it would be more pythonic as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20621) Refactor the TypeInformation implementation in Python DataStream API
Shuiqiang Chen created FLINK-20621: -- Summary: Refactor the TypeInformation implementation in Python DataStream API Key: FLINK-20621 URL: https://issues.apache.org/jira/browse/FLINK-20621 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.12.0 Currently, when users declare a TypeInfo in Python DataStream job it will create a Java TypeInfo instance when instantiating. We should make it in a lazy style that creates the Java TypeInfo instance only when calling TypeInformation.get_java_type_info(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20123) Test native support of PyFlink on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070 ] Shuiqiang Chen edited comment on FLINK-20123 at 11/26/20, 5:27 AM: --- Hi [~rmetzger] I found that the native k8s cluster could not be unregistered when executing Python DataStream application in attach mode during test. I have created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and will fix it today. was (Author: csq): Hi [~rmetzger] I found that the native k8s cluster could not be unregistered when executing Python DataStream application in attach mode during test. I have created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and will fixed it today. > Test native support of PyFlink on Kubernetes > > > Key: FLINK-20123 > URL: https://issues.apache.org/jira/browse/FLINK-20123 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing keep us updated on tests conducted, or please write a > short summary of all things you have tested in the end. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20123) Test native support of PyFlink on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070 ] Shuiqiang Chen edited comment on FLINK-20123 at 11/26/20, 5:27 AM: --- Hi [~rmetzger] I found that the native k8s cluster could not be unregistered when executing Python DataStream application in attach mode during test. I have created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and will fixed it today. was (Author: csq): Hi [~rmetzger] I found that the native k8s cluster could not be unregistered when executing Python DataStream application in attach mode during test. I have created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for and will fixed it today. > Test native support of PyFlink on Kubernetes > > > Key: FLINK-20123 > URL: https://issues.apache.org/jira/browse/FLINK-20123 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing keep us updated on tests conducted, or please write a > short summary of all things you have tested in the end. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.
[ https://issues.apache.org/jira/browse/FLINK-20365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20365: --- Affects Version/s: (was: 1.12.0) > The native k8s cluster could not be unregistered when executing Python > DataStream application attachedly. > - > > Key: FLINK-20365 > URL: https://issues.apache.org/jira/browse/FLINK-20365 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Shuiqiang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.
[ https://issues.apache.org/jira/browse/FLINK-20365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20365: --- Fix Version/s: 1.12.0 > The native k8s cluster could not be unregistered when executing Python > DataStream application attachedly. > - > > Key: FLINK-20365 > URL: https://issues.apache.org/jira/browse/FLINK-20365 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20123) Test native support of PyFlink on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070 ] Shuiqiang Chen commented on FLINK-20123: Hi [~rmetzger] I found that the native k8s cluster could not be unregistered when executing Python DataStream application in attach mode during test. I have created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for and will fixed it today. > Test native support of PyFlink on Kubernetes > > > Key: FLINK-20123 > URL: https://issues.apache.org/jira/browse/FLINK-20123 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing keep us updated on tests conducted, or please write a > short summary of all things you have tested in the end. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.
Shuiqiang Chen created FLINK-20365: -- Summary: The native k8s cluster could not be unregistered when executing Python DataStream application attachedly. Key: FLINK-20365 URL: https://issues.apache.org/jira/browse/FLINK-20365 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.0 Reporter: Shuiqiang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20199) PyFlink e2e DataStream test failed during stopping kafka services
[ https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237087#comment-17237087 ] Shuiqiang Chen edited comment on FLINK-20199 at 11/23/20, 3:38 AM: --- [~hxbks2ks] Thank you for reporting this issue. It seems that it failed to stop the Kafka process after the DataStream job finished. I'll keep an eye on this problem. was (Author: csq): [~hxbks2ks] Thank you for reporting this issue. It is because the testing environment's instability that it fail to stop the Kafka process after the DataStream job finished. > PyFlink e2e DataStream test failed during stopping kafka services > - > > Key: FLINK-20199 > URL: https://issues.apache.org/jira/browse/FLINK-20199 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Huang Xingbo >Priority: Major > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529] > > {code} > Nov 17 06:04:35 Reading kafka messages... > Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host > fv-az598-520. > Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host > fv-az598-520. > Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = > 'kafka' > Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = > 'kafka' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"
[ https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237090#comment-17237090 ] Shuiqiang Chen edited comment on FLINK-19951 at 11/23/20, 3:32 AM: --- Hi everyone, as illustrated in the comments above, I have added a timeout limitation for reading output result from Kafka so that it won't stuck forever as part of [this pr|https://github.com/apache/flink/pull/14068]. And it will print logs for JM and TM for debug purpose if the read data is not as expected. Will revisit this issue when it fails again (with more logs). Regarding to the instance reported by [~xintongsong], I think it's not related to this issue and is already tracked in FLINK-20199. was (Author: csq): Hi everyone, as illustrated in the comments above, I have added a timeout limitation for reading output result from Kafka so that it won't stuck forever. And it will print logs for JM and TM for debug purpose if the read data is not as expected. Please refer to [this pr|https://github.com/apache/flink/pull/14068] Overall, to figure out the root cause, we need to observe for a little while with more debug information for test failures. > PyFlink end-to-end test stuck in "Reading kafka messages" > - > > Key: FLINK-19951 > URL: https://issues.apache.org/jira/browse/FLINK-19951 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job: > 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka... > 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from > https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz > 2020-11-03T08:18:10.3024006Z % Total% Received % Xferd Average Speed > TimeTime Time Current > 2020-11-03T08:18:10.3024610Z Dload Upload > Total SpentLeft Speed > 2020-11-03T08:18:10.3024891Z > 2020-11-03T08:18:10.6563956Z 0 00 00 0 0 0 > --:--:-- --:--:-- --:--:-- 0 > 2020-11-03T08:18:11.6568328Z 0 54.3M0 327680 0 92275 0 > 0:10:18 --:--:-- 0:10:18 92044 > 2020-11-03T08:18:12.6540430Z 11 54.3M 11 6272k0 0 4626k 0 > 0:00:12 0:00:01 0:00:11 4625k > 2020-11-03T08:18:13.6585146Z 23 54.3M 23 12.6M0 0 5521k 0 > 0:00:10 0:00:02 0:00:08 5521k > 2020-11-03T08:18:14.6558377Z 36 54.3M 36 19.7M0 0 6018k 0 > 0:00:09 0:00:03 0:00:06 6017k > 2020-11-03T08:18:15.6593118Z 49 54.3M 49 26.7M0 0 6297k 0 > 0:00:08 0:00:04 0:00:04 6297k > 2020-11-03T08:18:16.653Z 62 54.3M 62 34.0M0 0 6515k 0 > 0:00:08 0:00:05 0:00:03 6973k > 2020-11-03T08:18:17.6544951Z 76 54.3M 76 41.8M0 0 6747k 0 > 0:00:08 0:00:06 0:00:02 7322k > 2020-11-03T08:18:18.2448109Z 91 54.3M 91 49.7M0 0 6923k 0 > 0:00:08 0:00:07 0:00:01 7584k > 2020-11-03T08:18:18.2450531Z 100 54.3M 100 54.3M0 0 7010k 0 > 0:00:07 0:00:07 --:--:-- 7737k > 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been > started ... > 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker... > 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic > test-python-data-stream-source. > 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka... > 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic > test-python-data-stream-sink. > 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with > JobID 1b0c317b47c69ee600937e1715ad9cce > 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages... > 2020-11-03T08:53:10.5246998Z > == > 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of > the allocated time budget of 250 minutes === > 2020-11-03T08:53:10.5251343Z > == > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"
[ https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237090#comment-17237090 ] Shuiqiang Chen commented on FLINK-19951: Hi everyone, as illustrated in the comments above, I have added a timeout limitation for reading output result from Kafka so that it won't stuck forever. And it will print logs for JM and TM for debug purpose if the read data is not as expected. Please refer to [this pr|https://github.com/apache/flink/pull/14068] Overall, to figure out the root cause, we need to observe for a little while with more debug information for test failures. > PyFlink end-to-end test stuck in "Reading kafka messages" > - > > Key: FLINK-19951 > URL: https://issues.apache.org/jira/browse/FLINK-19951 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job: > 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka... > 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from > https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz > 2020-11-03T08:18:10.3024006Z % Total% Received % Xferd Average Speed > TimeTime Time Current > 2020-11-03T08:18:10.3024610Z Dload Upload > Total SpentLeft Speed > 2020-11-03T08:18:10.3024891Z > 2020-11-03T08:18:10.6563956Z 0 00 00 0 0 0 > --:--:-- --:--:-- --:--:-- 0 > 2020-11-03T08:18:11.6568328Z 0 54.3M0 327680 0 92275 0 > 0:10:18 --:--:-- 0:10:18 92044 > 2020-11-03T08:18:12.6540430Z 11 54.3M 11 6272k0 0 4626k 0 > 0:00:12 0:00:01 0:00:11 4625k > 2020-11-03T08:18:13.6585146Z 23 54.3M 23 12.6M0 0 5521k 0 > 0:00:10 0:00:02 0:00:08 5521k > 2020-11-03T08:18:14.6558377Z 36 54.3M 36 19.7M0 0 6018k 0 > 0:00:09 0:00:03 0:00:06 6017k > 2020-11-03T08:18:15.6593118Z 49 54.3M 49 26.7M0 0 6297k 0 > 0:00:08 0:00:04 0:00:04 6297k > 2020-11-03T08:18:16.653Z 62 54.3M 62 34.0M0 0 6515k 0 > 0:00:08 0:00:05 0:00:03 6973k > 2020-11-03T08:18:17.6544951Z 76 54.3M 76 41.8M0 0 6747k 0 > 0:00:08 0:00:06 0:00:02 7322k > 2020-11-03T08:18:18.2448109Z 91 54.3M 91 49.7M0 0 6923k 0 > 0:00:08 0:00:07 0:00:01 7584k > 2020-11-03T08:18:18.2450531Z 100 54.3M 100 54.3M0 0 7010k 0 > 0:00:07 0:00:07 --:--:-- 7737k > 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been > started ... > 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker... > 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic > test-python-data-stream-source. > 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka... > 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic > test-python-data-stream-sink. > 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with > JobID 1b0c317b47c69ee600937e1715ad9cce > 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages... > 2020-11-03T08:53:10.5246998Z > == > 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of > the allocated time budget of 250 minutes === > 2020-11-03T08:53:10.5251343Z > == > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20199) PyFlink unstable e2e DataStream test
[ https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237087#comment-17237087 ] Shuiqiang Chen commented on FLINK-20199: [~hxbks2ks] Thank you for reporting this issue. It is because the testing environment's instability that it fail to stop the Kafka process after the DataStream job finished. > PyFlink unstable e2e DataStream test > > > Key: FLINK-20199 > URL: https://issues.apache.org/jira/browse/FLINK-20199 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Huang Xingbo >Priority: Major > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529] > > {code} > Nov 17 06:04:35 Reading kafka messages... > Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host > fv-az598-520. > Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host > fv-az598-520. > Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = > 'kafka' > Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = > 'kafka' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20199) PyFlink unstable e2e DataStream test
[ https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20199: --- Description: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529] {code} Nov 17 06:04:35 Reading kafka messages... Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host fv-az598-520. Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host fv-az598-520. Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 'kafka' Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 'kafka' {code} was:https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > PyFlink unstable e2e DataStream test > > > Key: FLINK-20199 > URL: https://issues.apache.org/jira/browse/FLINK-20199 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Huang Xingbo >Priority: Major > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529] > > {code} > Nov 17 06:04:35 Reading kafka messages... > Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host > fv-az598-520. > Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host > fv-az598-520. > Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = > 'kafka' > Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = > 'kafka' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-20199) PyFlink unstable e2e DataStream test
[ https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20199: --- Comment: was deleted (was: Thank you for reporting the issue. It is related to [this ticket|https://issues.apache.org/jira/browse/FLINK-19951]) > PyFlink unstable e2e DataStream test > > > Key: FLINK-20199 > URL: https://issues.apache.org/jira/browse/FLINK-20199 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Huang Xingbo >Priority: Major > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529] > > {code} > Nov 17 06:04:35 Reading kafka messages... > Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. > Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host > fv-az598-520. > Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host > fv-az598-520. > Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = > 'kafka' > Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = > 'kafka' > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20199) PyFlink unstable e2e DataStream test
[ https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237067#comment-17237067 ] Shuiqiang Chen commented on FLINK-20199: Thank you for reporting the issue. It is related to [this ticket|https://issues.apache.org/jira/browse/FLINK-19951] > PyFlink unstable e2e DataStream test > > > Key: FLINK-20199 > URL: https://issues.apache.org/jira/browse/FLINK-20199 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Huang Xingbo >Priority: Major > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20135) Test Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237055#comment-17237055 ] Shuiqiang Chen edited comment on FLINK-20135 at 11/23/20, 1:40 AM: --- The following APIs have been tested, and most of them are functioning well. |rebalance| |forward| |broadcast| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| |StreamExecutionEnvironment| |from_collection| |execute_async| |add_source| |add_python_file| |set_python_requirement| |set_python_archive| |set_python_executable| These two APIs do not work as expected: |add_jars| |add_classpaths| I have created Jira and pr for it: https://issues.apache.org/jira/browse/FLINK-20275 was (Author: csq): The following APIs have been tested, and most of them are functioning well. |rebalance| |forward| |broadcast| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| |StreamExecutionEnvironment| |from_collection| |execute_async| |add_source| |add_python_file| |set_python_requirement| |set_python_archive| |set_python_executable| These two APIs do not work as expected: |add_jars| |add_classpaths| I have created Jira and pr repectively: https://issues.apache.org/jira/browse/FLINK-20275 > Test Python DataStream API > -- > > Key: FLINK-20135 > URL: https://issues.apache.org/jira/browse/FLINK-20135 > Project: Flink > Issue Type: Task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20135) Test Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237055#comment-17237055 ] Shuiqiang Chen commented on FLINK-20135: The following APIs have been tested, and most of them are functioning well. |rebalance| |forward| |broadcast| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| |StreamExecutionEnvironment| |from_collection| |execute_async| |add_source| |add_python_file| |set_python_requirement| |set_python_archive| |set_python_executable| These two APIs do not work as expected: |add_jars| |add_classpaths| I have created Jira and pr repectively: https://issues.apache.org/jira/browse/FLINK-20275 > Test Python DataStream API > -- > > Key: FLINK-20135 > URL: https://issues.apache.org/jira/browse/FLINK-20135 > Project: Flink > Issue Type: Task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20275) The path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment should be ;
[ https://issues.apache.org/jira/browse/FLINK-20275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20275: --- Component/s: API / Python > The path delimiter for add_jars and add_classpaths in Python > StreamExecutionEnvironment should be ; > > > Key: FLINK-20275 > URL: https://issues.apache.org/jira/browse/FLINK-20275 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.12.0 > > > Currently, the path delimiter for add_jars and add_classpaths in Python > StreamExecutionEnvironment is ",", this would cause the rest client fail to > upload the specified jars and stuck forever without errors. It should be ";" > instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20275) The path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment should be ;
Shuiqiang Chen created FLINK-20275: -- Summary: The path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment should be ; Key: FLINK-20275 URL: https://issues.apache.org/jira/browse/FLINK-20275 Project: Flink Issue Type: Bug Reporter: Shuiqiang Chen Fix For: 1.12.0 Currently, the path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment is ",", this would cause the rest client fail to upload the specified jars and stuck forever without errors. It should be ";" instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20135) Test Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235826#comment-17235826 ] Shuiqiang Chen edited comment on FLINK-20135 at 11/20/20, 1:50 AM: --- Thank you for creating this ticket. I have finished testing the following APIs for Python DataStream API, all of them work as expected under normal condition and throw exceptions with invalid inputs: |get_name| |name| |uid| |set_uid_hash| |set_parallelism| |set_max_parallelism| |get_type| |get_execution_environment| |get_execution_config| |force_non_parallel| |set_buffer_timeout| |start_new_chain| |disable_chaining| |slot_sharing_group| |map| |flat_map| |key_by| |filter| |union| |connect| |shuffle| |project| |rescale| |broadcast| |print| I would test the following API as soon as possible: |rebalance| |forward| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| All these APIs have documented respectively. was (Author: csq): Thank you for creating this ticket. I have finished testing the following APIs for Python DataStream API, all of them work as expected under normal condition and throw exceptions with invalid inputs: |get_name| |name| |uid| |set_uid_hash| |set_parallelism| |set_max_parallelism| |get_type| |get_execution_environment| |get_execution_config| |force_non_parallel| |set_buffer_timeout| |start_new_chain| |disable_chaining| |slot_sharing_group| |map| |flat_map| |key_by| |filter| |union| |connect| |shuffle| |project| |rescale| |broadcast| |print| I would test the following API as soon as possible: |rebalance| |forward| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| All these APIs have documented respectively. * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13340263] > Test Python DataStream API > -- > > Key: FLINK-20135 > URL: https://issues.apache.org/jira/browse/FLINK-20135 > Project: Flink > Issue Type: Task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-20123) Test native support of PyFlink on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20123: --- Comment: was deleted (was: Thank you for creating this ticket. I have finished testing the following APIs for Python DataStream API, all of them work as expected under normal condition and throw exceptions with invalid inputs: |get_name| |name| |uid| |set_uid_hash| |set_parallelism| |set_max_parallelism| |get_type| |get_execution_environment| |get_execution_config| |force_non_parallel| |set_buffer_timeout| |start_new_chain| |disable_chaining| |slot_sharing_group| |map| |flat_map| |key_by| |filter| |union| |connect| |shuffle| |project| |rescale| |broadcast| |print| I would test the following API as soon as possible: |rebalance| |forward| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| All these APIs have documented respectively.) > Test native support of PyFlink on Kubernetes > > > Key: FLINK-20123 > URL: https://issues.apache.org/jira/browse/FLINK-20123 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing keep us updated on tests conducted, or please write a > short summary of all things you have tested in the end. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20135) Test Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235826#comment-17235826 ] Shuiqiang Chen commented on FLINK-20135: Thank you for creating this ticket. I have finished testing the following APIs for Python DataStream API, all of them work as expected under normal condition and throw exceptions with invalid inputs: |get_name| |name| |uid| |set_uid_hash| |set_parallelism| |set_max_parallelism| |get_type| |get_execution_environment| |get_execution_config| |force_non_parallel| |set_buffer_timeout| |start_new_chain| |disable_chaining| |slot_sharing_group| |map| |flat_map| |key_by| |filter| |union| |connect| |shuffle| |project| |rescale| |broadcast| |print| I would test the following API as soon as possible: |rebalance| |forward| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| All these APIs have documented respectively. * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13340263] > Test Python DataStream API > -- > > Key: FLINK-20135 > URL: https://issues.apache.org/jira/browse/FLINK-20135 > Project: Flink > Issue Type: Task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20123) Test native support of PyFlink on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235491#comment-17235491 ] Shuiqiang Chen commented on FLINK-20123: Thank you for creating this ticket. I have finished testing the following APIs for Python DataStream API, all of them work as expected under normal condition and throw exceptions with invalid inputs: |get_name| |name| |uid| |set_uid_hash| |set_parallelism| |set_max_parallelism| |get_type| |get_execution_environment| |get_execution_config| |force_non_parallel| |set_buffer_timeout| |start_new_chain| |disable_chaining| |slot_sharing_group| |map| |flat_map| |key_by| |filter| |union| |connect| |shuffle| |project| |rescale| |broadcast| |print| I would test the following API as soon as possible: |rebalance| |forward| |process| |assign_timestamp_and_watermark| |partition_custom| |add_sink| All these APIs have documented respectively. > Test native support of PyFlink on Kubernetes > > > Key: FLINK-20123 > URL: https://issues.apache.org/jira/browse/FLINK-20123 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Shuiqiang Chen >Priority: Critical > Fix For: 1.12.0 > > > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing keep us updated on tests conducted, or please write a > short summary of all things you have tested in the end. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"
[ https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233190#comment-17233190 ] Shuiqiang Chen edited comment on FLINK-19951 at 11/17/20, 1:58 AM: --- Hi [~dian.fu] [~rmetzger] thank you for reporting this issue. It's because that the Kafka consumer could not read the expected number of messages(15), so it would stuck here. The root cause might be the Flink job is not running normally. Maybe we could printing out the log of JM on exiting the test. Or reading the Kafka within a specific time. I will have a try as soon as possible. was (Author: csq): Hi [~dian.fu] it's because that the Kafka consumer could not read the expected number of messages(15), so it would stuck here. The root cause might be the Flink job is not running normally. Maybe we could printing out the log of JM on exiting the test. > PyFlink end-to-end test stuck in "Reading kafka messages" > - > > Key: FLINK-19951 > URL: https://issues.apache.org/jira/browse/FLINK-19951 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job: > 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka... > 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from > https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz > 2020-11-03T08:18:10.3024006Z % Total% Received % Xferd Average Speed > TimeTime Time Current > 2020-11-03T08:18:10.3024610Z Dload Upload > Total SpentLeft Speed > 2020-11-03T08:18:10.3024891Z > 2020-11-03T08:18:10.6563956Z 0 00 00 0 0 0 > --:--:-- --:--:-- --:--:-- 0 > 2020-11-03T08:18:11.6568328Z 0 54.3M0 327680 0 92275 0 > 0:10:18 --:--:-- 0:10:18 92044 > 2020-11-03T08:18:12.6540430Z 11 54.3M 11 6272k0 0 4626k 0 > 0:00:12 0:00:01 0:00:11 4625k > 2020-11-03T08:18:13.6585146Z 23 54.3M 23 12.6M0 0 5521k 0 > 0:00:10 0:00:02 0:00:08 5521k > 2020-11-03T08:18:14.6558377Z 36 54.3M 36 19.7M0 0 6018k 0 > 0:00:09 0:00:03 0:00:06 6017k > 2020-11-03T08:18:15.6593118Z 49 54.3M 49 26.7M0 0 6297k 0 > 0:00:08 0:00:04 0:00:04 6297k > 2020-11-03T08:18:16.653Z 62 54.3M 62 34.0M0 0 6515k 0 > 0:00:08 0:00:05 0:00:03 6973k > 2020-11-03T08:18:17.6544951Z 76 54.3M 76 41.8M0 0 6747k 0 > 0:00:08 0:00:06 0:00:02 7322k > 2020-11-03T08:18:18.2448109Z 91 54.3M 91 49.7M0 0 6923k 0 > 0:00:08 0:00:07 0:00:01 7584k > 2020-11-03T08:18:18.2450531Z 100 54.3M 100 54.3M0 0 7010k 0 > 0:00:07 0:00:07 --:--:-- 7737k > 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been > started ... > 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker... > 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic > test-python-data-stream-source. > 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka... > 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic > test-python-data-stream-sink. > 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with > JobID 1b0c317b47c69ee600937e1715ad9cce > 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages... > 2020-11-03T08:53:10.5246998Z > == > 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of > the allocated time budget of 250 minutes === > 2020-11-03T08:53:10.5251343Z > == > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"
[ https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233190#comment-17233190 ] Shuiqiang Chen commented on FLINK-19951: Hi [~dian.fu] it's because that the Kafka consumer could not read the expected number of messages(15), so it would stuck here. The root cause might be the Flink job is not running normally. Maybe we could printing out the log of JM on exiting the test. > PyFlink end-to-end test stuck in "Reading kafka messages" > - > > Key: FLINK-19951 > URL: https://issues.apache.org/jira/browse/FLINK-19951 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job: > 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka... > 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from > https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz > 2020-11-03T08:18:10.3024006Z % Total% Received % Xferd Average Speed > TimeTime Time Current > 2020-11-03T08:18:10.3024610Z Dload Upload > Total SpentLeft Speed > 2020-11-03T08:18:10.3024891Z > 2020-11-03T08:18:10.6563956Z 0 00 00 0 0 0 > --:--:-- --:--:-- --:--:-- 0 > 2020-11-03T08:18:11.6568328Z 0 54.3M0 327680 0 92275 0 > 0:10:18 --:--:-- 0:10:18 92044 > 2020-11-03T08:18:12.6540430Z 11 54.3M 11 6272k0 0 4626k 0 > 0:00:12 0:00:01 0:00:11 4625k > 2020-11-03T08:18:13.6585146Z 23 54.3M 23 12.6M0 0 5521k 0 > 0:00:10 0:00:02 0:00:08 5521k > 2020-11-03T08:18:14.6558377Z 36 54.3M 36 19.7M0 0 6018k 0 > 0:00:09 0:00:03 0:00:06 6017k > 2020-11-03T08:18:15.6593118Z 49 54.3M 49 26.7M0 0 6297k 0 > 0:00:08 0:00:04 0:00:04 6297k > 2020-11-03T08:18:16.653Z 62 54.3M 62 34.0M0 0 6515k 0 > 0:00:08 0:00:05 0:00:03 6973k > 2020-11-03T08:18:17.6544951Z 76 54.3M 76 41.8M0 0 6747k 0 > 0:00:08 0:00:06 0:00:02 7322k > 2020-11-03T08:18:18.2448109Z 91 54.3M 91 49.7M0 0 6923k 0 > 0:00:08 0:00:07 0:00:01 7584k > 2020-11-03T08:18:18.2450531Z 100 54.3M 100 54.3M0 0 7010k 0 > 0:00:07 0:00:07 --:--:-- 7737k > 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been > started ... > 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker... > 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic > test-python-data-stream-source. > 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka... > 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic > test-python-data-stream-sink. > 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with > JobID 1b0c317b47c69ee600937e1715ad9cce > 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages... > 2020-11-03T08:53:10.5246998Z > == > 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of > the allocated time budget of 250 minutes === > 2020-11-03T08:53:10.5251343Z > == > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20164) Add docs for ProcessFunction and Timer in Python DataStream API.
[ https://issues.apache.org/jira/browse/FLINK-20164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-20164: --- Summary: Add docs for ProcessFunction and Timer in Python DataStream API. (was: Add docs for ProcessFunction and Timer for Python DataStream API.) > Add docs for ProcessFunction and Timer in Python DataStream API. > > > Key: FLINK-20164 > URL: https://issues.apache.org/jira/browse/FLINK-20164 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Reporter: Shuiqiang Chen >Assignee: Shuiqiang Chen >Priority: Minor > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20164) Add docs for ProcessFunction and Timer for Python DataStream API.
Shuiqiang Chen created FLINK-20164: -- Summary: Add docs for ProcessFunction and Timer for Python DataStream API. Key: FLINK-20164 URL: https://issues.apache.org/jira/browse/FLINK-20164 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Shuiqiang Chen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20135) Test Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17231578#comment-17231578 ] Shuiqiang Chen commented on FLINK-20135: [~dian.fu] Thank you for bring up the Task, sure, I will do the tests. > Test Python DataStream API > -- > > Key: FLINK-20135 > URL: https://issues.apache.org/jira/browse/FLINK-20135 > Project: Flink > Issue Type: Task > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20137) User cannot obtain the timestamp of current processing element in the ProcessFunction of Python DataStream API
Shuiqiang Chen created FLINK-20137: -- Summary: User cannot obtain the timestamp of current processing element in the ProcessFunction of Python DataStream API Key: FLINK-20137 URL: https://issues.apache.org/jira/browse/FLINK-20137 Project: Flink Issue Type: Bug Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.12.0 Currently, User cannot obtain the timestamp of current processing element in the ProcessFunction of Python DataStream API. It is because we forget to emit the previous timestamp of current record in Upstream operator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20016) Support TimestampAssigner and WatermarkGenerator for Python DataStream API.
Shuiqiang Chen created FLINK-20016: -- Summary: Support TimestampAssigner and WatermarkGenerator for Python DataStream API. Key: FLINK-20016 URL: https://issues.apache.org/jira/browse/FLINK-20016 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19821) Add ProcessFunction and timer access for Python DataStream API.
Shuiqiang Chen created FLINK-19821: -- Summary: Add ProcessFunction and timer access for Python DataStream API. Key: FLINK-19821 URL: https://issues.apache.org/jira/browse/FLINK-19821 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.12.0 Add ProcessFunction and timer accessing interfaces for Python DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19770) PythonProgramOptionsTest requires package phase before execution
[ https://issues.apache.org/jira/browse/FLINK-19770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219404#comment-17219404 ] Shuiqiang Chen commented on FLINK-19770: [~juha.mynttinen] Thank you for reporting this issue, we should make these test cases as integration tests that would be executed after the package building phase completed as [~chesnay] mentioned. I would like to fix it. > PythonProgramOptionsTest requires package phase before execution > > > Key: FLINK-19770 > URL: https://issues.apache.org/jira/browse/FLINK-19770 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.2 >Reporter: Juha Mynttinen >Priority: Minor > Labels: starter > Fix For: 1.12.0, 1.11.3 > > > The PR [https://github.com/apache/flink/pull/13322] lately added the test > method testConfigurePythonExecution in > org.apache.flink.client.cli.PythonProgramOptionsTest. > > "mvn clean verify" fails for me in testConfigurePythonExecution: > > ... > INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 > s <<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] > testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) > Time elapsed: 0.019 s <<< ERROR! > java.nio.file.NoSuchFileException: target/dummy-job-jar > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149) > at > java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.base/java.nio.file.Files.readAttributes(Files.java:1763) > at > java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) > at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) > at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2716) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2796) > at > org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at >
[jira] [Commented] (FLINK-19484) Kubernetes pyflink application test failed with "error executing jsonpath "{range .items[*]}{.metadata.name}{\"\\n\"}{end}": Error executing template: not in rang"
[ https://issues.apache.org/jira/browse/FLINK-19484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17212208#comment-17212208 ] Shuiqiang Chen commented on FLINK-19484: [~dian.fu] Thank you for reporting this issue. It might be the matter of minikube resource allocation. The image size for native k8s pyflink application is a bit large, which might take some time to start up the pod. Currently, the timeout for waiting the job manager's availability are only 30s, increasing it to 120s might resolve this problem. > Kubernetes pyflink application test failed with "error executing jsonpath > "{range .items[*]}{.metadata.name}{\"\\n\"}{end}": Error executing template: > not in rang" > --- > > Key: FLINK-19484 > URL: https://issues.apache.org/jira/browse/FLINK-19484 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7139=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > 2020-09-30T21:14:41.0570715Z 2020-09-30 21:14:41,056 INFO > org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create > flink application cluster flink-native-k8s-pyflink-application-1 > successfully, JobManager Web Interface: http://10.1.0.4:30141 > 2020-09-30T21:15:11.2323195Z error: timed out waiting for the condition on > deployments/flink-native-k8s-pyflink-application-1 > 2020-09-30T21:15:11.2384760Z Stopping job timeout watchdog (with pid=111683) > 2020-09-30T21:15:11.2386729Z Debugging failed Kubernetes test: > 2020-09-30T21:15:11.2387191Z Currently existing Kubernetes resources > 2020-09-30T21:15:11.3502610Z NAME > TYPECLUSTER-IP EXTERNAL-IP PORT(S) AGE > 2020-09-30T21:15:11.3506194Z service/flink-native-k8s-pyflink-application-1 > ClusterIP None 6123/TCP,6124/TCP 31s > 2020-09-30T21:15:11.3507403Z > service/flink-native-k8s-pyflink-application-1-rest NodePort > 10.104.215.1 8081:30141/TCP 31s > 2020-09-30T21:15:11.3529743Z service/kubernetes > ClusterIP 10.96.0.1 443/TCP 18m > 2020-09-30T21:15:11.3530391Z > 2020-09-30T21:15:11.3531349Z NAME > READY UP-TO-DATE AVAILABLE AGE > 2020-09-30T21:15:11.3532200Z > deployment.apps/flink-native-k8s-pyflink-application-1 0/1 0 > 0 31s > 2020-09-30T21:15:11.4887105Z Name: > flink-native-k8s-pyflink-application-1 > 2020-09-30T21:15:11.4887491Z Namespace: default > 2020-09-30T21:15:11.4888028Z Labels: > app=flink-native-k8s-pyflink-application-1 > 2020-09-30T21:15:11.4888534Ztype=flink-native-kubernetes > 2020-09-30T21:15:11.443Z Annotations: > 2020-09-30T21:15:11.4890558Z Selector: > app=flink-native-k8s-pyflink-application-1,component=jobmanager,type=flink-native-kubernetes > 2020-09-30T21:15:11.4900945Z Type: ClusterIP > 2020-09-30T21:15:11.4901267Z IP:None > 2020-09-30T21:15:11.4903601Z Port: jobmanager-rpc 6123/TCP > 2020-09-30T21:15:11.4903911Z TargetPort:6123/TCP > 2020-09-30T21:15:11.4904155Z Endpoints: > 2020-09-30T21:15:11.4907175Z Port: blobserver 6124/TCP > 2020-09-30T21:15:11.4907586Z TargetPort:6124/TCP > 2020-09-30T21:15:11.4907842Z Endpoints: > 2020-09-30T21:15:11.4908063Z Session Affinity: None > 2020-09-30T21:15:11.4908298Z Events: > 2020-09-30T21:15:11.4970434Z > 2020-09-30T21:15:11.4970653Z > 2020-09-30T21:15:11.4971488Z Name: > flink-native-k8s-pyflink-application-1-rest > 2020-09-30T21:15:11.4971843Z Namespace:default > 2020-09-30T21:15:11.4972563Z Labels: > app=flink-native-k8s-pyflink-application-1 > 2020-09-30T21:15:11.4973297Z > type=flink-native-kubernetes > 2020-09-30T21:15:11.4973749Z Annotations: > 2020-09-30T21:15:11.4974568Z Selector: > app=flink-native-k8s-pyflink-application-1,component=jobmanager,type=flink-native-kubernetes > 2020-09-30T21:15:11.4974969Z Type: NodePort > 2020-09-30T21:15:11.4976265Z IP: 10.104.215.1 > 2020-09-30T21:15:11.4976620Z Port: rest 8081/TCP > 2020-09-30T21:15:11.4976991Z TargetPort: 8081/TCP >
[jira] [Updated] (FLINK-19145) Add PyFlink-walkthrough in Flink playground.
[ https://issues.apache.org/jira/browse/FLINK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-19145: --- Description: Add the walkthrough for PyFlink in Flink playground. > Add PyFlink-walkthrough in Flink playground. > > > Key: FLINK-19145 > URL: https://issues.apache.org/jira/browse/FLINK-19145 > Project: Flink > Issue Type: Improvement > Components: Documentation / Training / Exercises >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.12.0 > > > Add the walkthrough for PyFlink in Flink playground. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19145) Add PyFlink-walkthrough to Flink playground.
[ https://issues.apache.org/jira/browse/FLINK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuiqiang Chen updated FLINK-19145: --- Summary: Add PyFlink-walkthrough to Flink playground. (was: Add PyFlink-walkthrough in Flink playground.) > Add PyFlink-walkthrough to Flink playground. > > > Key: FLINK-19145 > URL: https://issues.apache.org/jira/browse/FLINK-19145 > Project: Flink > Issue Type: Improvement > Components: Documentation / Training / Exercises >Reporter: Shuiqiang Chen >Priority: Major > Fix For: 1.12.0 > > > Add the walkthrough for PyFlink in Flink playground. -- This message was sent by Atlassian Jira (v8.3.4#803005)