[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-05-30 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727780#comment-17727780
 ] 

Shuiqiang Chen commented on FLINK-30966:


[~luoyuxia]Sorry for the delay, I would continue to work on the issue and 
update the PR  as soon as possible.

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-23 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715419#comment-17715419
 ] 

Shuiqiang Chen edited comment on FLINK-31848 at 4/23/23 9:39 AM:
-

Here is a test case:

{code:scala}
  def testRowAndRow(): Unit = {
val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow "

val data = List(
  Row.of("Hello", Int.box(1), Int.box(1)),
  Row.of("Hello", null, Int.box(1)),
  Row.of("Hello again", Int.box(3), Int.box(2)))

implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, 
Types.INT, Types.INT)

val ds = env.fromCollection(data)

val t = ds.toTable(tEnv, 'a, 'b, 'c)
tEnv.createTemporaryView("MyTableRow", t)

val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
val sink = new TestingAppendSink
result.addSink(sink)
env.execute()

val expected = List("FALSE,1,1, TRUE,3,2, null,null,1")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
  }
{code}
The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1)
But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2)



was (Author: csq):
Here is a test case:

{code:scala}
  def testRowAndRow(): Unit = {
val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow "

val data = List(
  Row.of("Hello", Int.box(1), Int.box(1)),
  Row.of("Hello", null, Int.box(1)),
  Row.of("Hello again", Int.box(3), Int.box(2)))

implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, 
Types.INT, Types.INT)

val ds = env.fromCollection(data)

val t = ds.toTable(tEnv, 'a, 'b, 'c)
tEnv.createTemporaryView("MyTableRow", t)

val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
val sink = new TestingAppendSink
result.addSink(sink)
env.execute()

val expected = List("List(FALSE,1,1, TRUE,3,2, null,null,1)")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
  }
{code}
The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1)
But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2)


> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-23 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715419#comment-17715419
 ] 

Shuiqiang Chen commented on FLINK-31848:


Here is a test case:

{code:scala}
  def testRowAndRow(): Unit = {
val sqlQuery = "SELECT cast(b > 2 and c < 3 as string),b,c FROM MyTableRow "

val data = List(
  Row.of("Hello", Int.box(1), Int.box(1)),
  Row.of("Hello", null, Int.box(1)),
  Row.of("Hello again", Int.box(3), Int.box(2)))

implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, 
Types.INT, Types.INT)

val ds = env.fromCollection(data)

val t = ds.toTable(tEnv, 'a, 'b, 'c)
tEnv.createTemporaryView("MyTableRow", t)

val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
val sink = new TestingAppendSink
result.addSink(sink)
env.execute()

val expected = List("List(FALSE,1,1, TRUE,3,2, null,null,1)")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
  }
{code}
The expected result is List(FALSE,1,1, TRUE,3,2, null,null,1)
But get List(FALSE,1,1, FALSE,null,1, TRUE,3,2)


> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-23 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715415#comment-17715415
 ] 

Shuiqiang Chen commented on FLINK-31848:


Hi [~zju_zsx]
take a table MyTable(a INT, b INT) for instance, there are three rows:
(1, 1), 
(null, 1), 
(3, 3)

and for the query

select * from MyTable where a < 2 and b < 2

the value of left.nullTerm and left.resultTerm would be:

false, true // a is not null, then evaluate the reuslt of a < 2 to be true, 
need to evaluate right code
true, false // a is null, means the result of a < 2 is UNKNOWN, no need to 
evaluate a < 2 but to evalute b < 2 as right.code
false, false // a is not null, then evaluate the result of a < 2 to be false, 
no need to evaluate right code

with if(!left.resultTerm), we assume null < 2 to be false, it would become:
left&
TRUE && 1 < 2  -> TRUE
FALSE && skipped -> FALSE
FALSE && skipped -> FALSE

the final result of the and operation  is:
nullTerm  resultTerm
false   false
false   false
false   true

but with if(!left.nullTerm && !left.resultTerm), it should be:
left&
TRUE && 1 < 2 -> TRUE
UNKNOWN && 1 < 2 -> UNKNWON
FALSE && skipped -> FALSE

the final result of the and operation is:
nullTerm  resultTerm
false   false
true   false
false   true

Seems after the simplification, the result is not consistent.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-23 Thread Shuiqiang Chen (Jira)


[ https://issues.apache.org/jira/browse/FLINK-31848 ]


Shuiqiang Chen deleted comment on FLINK-31848:


was (Author: csq):
Hi [~zju_zsx],

take a table MyTable(a INT, b INT) for instance, there are three rows:
(1, 1), 
(null, 2), 
(3, 3)

and for the query

select * from MyTable where a < 2 and b < 2

the value of left.nullTerm and left.resultTerm would be:

false, true // a is not null, then evaluate the reuslt of a < 2 to be true, 
need to evaluate right code
true, false // a is null, means the result of a < 2 is UNKNOWN, no need to 
evaluate a < 2 but to evalute b < 2 as right.code
false, false // a is not null, then evaluate the result of a < 2 to be false, 
no need to evaluate right code

by simplying to if(!left.resultTerm), we assuem the result of null < 2 to be 
false, but actually it is UNKNOWN.

For the correctness, if(!left.resultTerm) and if(!left.nullTerm && 
!left.resultTerm) are identical. Since the generated code is not exposed to 
users and not affecting the correctness, the two implementations are fine for 
me.



> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-23 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715392#comment-17715392
 ] 

Shuiqiang Chen commented on FLINK-31848:


Hi [~zju_zsx],

take a table MyTable(a INT, b INT) for instance, there are three rows:
(1, 1), 
(null, 2), 
(3, 3)

and for the query

select * from MyTable where a < 2 and b < 2

the value of left.nullTerm and left.resultTerm would be:

false, true // a is not null, then evaluate the reuslt of a < 2 to be true, 
need to evaluate right code
true, false // a is null, means the result of a < 2 is UNKNOWN, no need to 
evaluate a < 2 but to evalute b < 2 as right.code
false, false // a is not null, then evaluate the result of a < 2 to be false, 
no need to evaluate right code

by simplying to if(!left.resultTerm), we assuem the result of null < 2 to be 
false, but actually it is UNKNOWN.

For the correctness, if(!left.resultTerm) and if(!left.nullTerm && 
!left.resultTerm) are identical. Since the generated code is not exposed to 
users and not affecting the correctness, the two implementations are fine for 
me.



> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-21 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115
 ] 

Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:32 PM:
-

[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified. While with `if (!${left.nullTerm} && 
!${left.resultTerm})`, it would be a little bit more intuitive that 
${left.nullTerm} indicates whether left result is UNKNOWN or not in three value 
logic.


was (Author: csq):
[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified. While with `if (!${left.nullTerm} && 
!${left.resultTerm})`, it would be a little bit more intuitive that 
${left.nullTerm} indicates whether left result is UNKNOWN or not in three value 
logic.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-21 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115
 ] 

Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:31 PM:
-

[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling `if (!${left.resultTerm})`. Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified. While with `if (!${left.nullTerm} && 
!${left.resultTerm})`, it would be a little bit more intuitive that 
${left.nullTerm} indicates whether left result is UNKNOWN or not in three value 
logic.


was (Author: csq):
[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling _if (!${left.resultTerm})_ . Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified. While with _if (!${left.nullTerm} && 
!${left.resultTerm})_ in current code base, it would be a little bit more 
intuitive that ${left.nullTerm} indicating whether left result is UNKNOWN or 
not.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-21 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115
 ] 

Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:29 PM:
-

[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling _if (!${left.resultTerm})_ . Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified. While with _if (!${left.nullTerm} && 
!${left.resultTerm})_ in current code base, it would be a little bit more 
intuitive that ${left.nullTerm} indicating whether left result is UNKNOWN or 
not.


was (Author: csq):
[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-21 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115
 ] 

Shuiqiang Chen edited comment on FLINK-31848 at 4/21/23 6:18 PM:
-

[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After running several 
test cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned with a boolean value and will never cause a syntax error 
by calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that 
the logic can be safely simplified.


was (Author: csq):
[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After ran several test 
cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned a boolean value and will never cause a syntax error by 
calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the 
logic can be safely simplified. 

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-21 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715115#comment-17715115
 ] 

Shuiqiang Chen commented on FLINK-31848:


[~martijnvisser][~zju_zsx][~jark] Thanks for your reply. After ran several test 
cases based on release-1.15 (which I previously analyzed) and the latest 
1.18-SNAPSHOT branch, it seems that the left.resultTerm in the left.code has 
always been assigned a boolean value and will never cause a syntax error by 
calling if (!${left.resultTerm}) . Therefore I agree with [~zju_zsx] that the 
logic can be safely simplified. 

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713985#comment-17713985
 ] 

Shuiqiang Chen commented on FLINK-31848:


And without null check, it can be simplified to be if (!${left.resultTerm}) as 
you mentioned and line 671 in ScalarOperatorGens.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31848) And Operator has side effect when operands have udf

2023-04-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713984#comment-17713984
 ] 

Shuiqiang Chen commented on FLINK-31848:


Hi,
The code block is under the null check required condition that the value of 
left.resultTerm might be null, then !${left.resultTerm} would cause syntax 
error.

> And Operator has side effect when operands have udf
> ---
>
> Key: FLINK-31848
> URL: https://issues.apache.org/jira/browse/FLINK-31848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.2
>Reporter: zju_zsx
>Priority: Major
> Attachments: image-2023-04-19-14-54-46-458.png
>
>
>  
> {code:java}
> CREATE TABLE kafka_source (
>    `content` varchar,
>    `testid` bigint,
>    `extra` int
>    );
> CREATE TABLE console_sink (
>    `content` varchar,
>    `testid` bigint
>  )
>   with (
>     'connector' = 'print'
> );
> insert into console_sink
> select 
>    content,testid+1
> from kafka_source where testid is not null and testid > 0 and my_udf(testid) 
> != 0; {code}
> my_udf has a constraint that the testid should not be null, but the testid is 
> not null and testid > 0 does not take effect.
>  
> Im ScalarOperatorGens.generateAnd
> !image-2023-04-19-14-54-46-458.png!
> if left.nullTerm is true, right code will be execute 。
> it seems that
> {code:java}
> if (!${left.nullTerm} && !${left.resultTerm}) {code}
> can be safely replaced with 
> {code:java}
> if (!${left.resultTerm}){code}
> ? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31313) Unsupported meta columns in column list of insert statement

2023-03-03 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17696112#comment-17696112
 ] 

Shuiqiang Chen commented on FLINK-31313:


Hi [~lincoln.86xy], I think it is the same issue with FLINK-30922.

> Unsupported meta columns in column list of insert statement
> ---
>
> Key: FLINK-31313
> URL: https://issues.apache.org/jira/browse/FLINK-31313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.16.1
>Reporter: lincoln lee
>Priority: Major
>
> Currently an error will be raised when ref meta columns in column list of 
> insert statement, e.g.,
> {code}
> INSERT INTO sink (a,b,f) -- here `f` is a metadata column of sink table
> SELECT ...{code}
> {code}
> Caused by: org.apache.calcite.runtime.CalciteContextException: At line 1, 
> column 44: Unknown target column 'f'
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917)
>   at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:440)
>   at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:428)
>   at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:169)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at scala.collection.Iterator.foreach(Iterator.scala:937)
>   at scala.collection.Iterator.foreach$(Iterator.scala:937)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:161)
>   at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.rewriteInsert(PreValidateReWriter.scala:72)
> {code}
> The cause is current PreValidateReWriter in validation phase uses the 
> physical types of sink table which does not include metadata columns



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692146#comment-17692146
 ] 

Shuiqiang Chen commented on FLINK-31124:


Never mind, it doesn't block the contribution.

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Tests
>Reporter: Biao Liu
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692076#comment-17692076
 ] 

Shuiqiang Chen commented on FLINK-31120:


I agree with [~TsReaper] that the variable does not need to be static since it 
is accessed only by current environment. Or is there any other consideration?

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Shuiqiang Chen
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-18 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690754#comment-17690754
 ] 

Shuiqiang Chen commented on FLINK-31124:


Thank you [~zhuzh], I have created the PR and will be grateful if you could 
help review it.  BTW, the username of the assignee is not the valid one of 
mine, my current JIRA id is `csq`.

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: chenshuiqiang
>Priority: Major
>  Labels: pull-request-available
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523
 ] 

Shuiqiang Chen edited comment on FLINK-31120 at 2/17/23 6:17 PM:
-

Maybe it needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.


was (Author: csq):
Maybe tt needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690524#comment-17690524
 ] 

Shuiqiang Chen commented on FLINK-31124:


Hi [~zhuzh] I would like to help finish to issue, could you please assign it to 
me?

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Minor
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523
 ] 

Shuiqiang Chen commented on FLINK-31120:


Maybe tt needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-12 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687683#comment-17687683
 ] 

Shuiqiang Chen edited comment on FLINK-30966 at 2/13/23 2:51 AM:
-

[~luoyuxia]The matter of 
[FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type 
inferencing, while there is another issue that the code block position of 
result term casting might be wrong.


was (Author: csq):
[~luoyuxia]The matter of 
[FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type 
inferencing, while there is another issue that the code block of result term 
casting might be wrong.

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-12 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687683#comment-17687683
 ] 

Shuiqiang Chen commented on FLINK-30966:


[~luoyuxia]The matter of 
[FLINK-31003|https://issues.apache.org/jira/browse/FLINK-31003] is return type 
inferencing, while there is another issue that the code block of result term 
casting might be wrong.

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-12 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687682#comment-17687682
 ] 

Shuiqiang Chen commented on FLINK-31003:


[~luoyuxia] +1

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347
 ] 

Shuiqiang Chen edited comment on FLINK-31003 at 2/11/23 2:51 AM:
-

Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.


was (Author: csq):
Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
title|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347
 ] 

Shuiqiang Chen commented on FLINK-31003:


Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
title|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-09 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686843#comment-17686843
 ] 

Shuiqiang Chen commented on FLINK-30966:


BTW, the expected result of your query might be:
+I[succeed, sent, u, 2023-02-08]

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-09 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842
 ] 

Shuiqiang Chen edited comment on FLINK-30966 at 2/10/23 2:39 AM:
-

Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen.
After investigated the generated code, there are two problems:
1. It perform the result term casting before the calculation logic, and finally 
the actual result always refer to a non-initialized field.
2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 
 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

I would like to help fix it. 


was (Author: csq):
Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen.
There are two problems.
1. It do the result term casting before the calculation logic, and finally the 
actual result always refer to a non-initialized field.
2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 
 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

I would like to help fix it. 

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-09 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842
 ] 

Shuiqiang Chen edited comment on FLINK-30966 at 2/10/23 2:38 AM:
-

Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCallGen.
There are two problems.
1. It do the result term casting before the calculation logic, and finally the 
actual result always refer to a non-initialized field.
2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 
 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

I would like to help fix it. 


was (Author: csq):
Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCodeGen.
There are two problems.
1. It do the result term casting before the calculation logic, and finally the 
actual result always refer to a non-initialized field.
2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 
 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

I would like to help fix it. 

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30966) Flink SQL IF FUNCTION logic error

2023-02-09 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686842#comment-17686842
 ] 

Shuiqiang Chen commented on FLINK-30966:


Hi [~hiscat], I have reproduced the same error and it seems a bug in IFCodeGen.
There are two problems.
1. It do the result term casting before the calculation logic, and finally the 
actual result always refer to a non-initialized field.
2. when normalizing arguments, it always align to the type of ARG1, like IF(1 > 
 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

I would like to help fix it. 

> Flink SQL IF FUNCTION logic error
> -
>
> Key: FLINK-30966
> URL: https://issues.apache.org/jira/browse/FLINK-30966
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
>Reporter: 谢波
>Priority: Major
>
> my data is 
> {code:java}
> //
> { "before": { "status": "sent" }, "after": { "status": "succeed" }, "op": 
> "u", "ts_ms": 1671926400225, "transaction": null } {code}
> my sql is 
>  
> {code:java}
> CREATE TABLE t
> (
> before ROW (
> status varchar (32)
> ),
> after ROW (
> status varchar (32)
> ),
> ts_msbigint,
> op   string,
> kafka_timestamp  timestamp METADATA FROM 'timestamp',
> -- @formatter:off
> proctime AS PROCTIME()
> -- @formatter:on
> ) WITH (
> 'connector' = 'kafka',
> -- 'topic' = '',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = ' ',
> 'properties.group.id' = '',
> 'format' = 'json',
> 'scan.topic-partition-discovery.interval' = '60s',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.ignore-parse-errors' = 'true'
>  );
> create table p
> (
> status  STRING ,
> before_status  STRING ,
> after_status  STRING ,
> metadata_operation  STRING COMMENT '源记录操作类型',
> dt  STRING
> )WITH (
> 'connector' = 'print'
>  );
> INSERT INTO p
> SELECT
>IF(op <> 'd', after.status, before.status),
> before.status,
> after.status,
>op AS metadata_operation,
>DATE_FORMAT(kafka_timestamp, '-MM-dd') AS dt
> FROM t;
>  {code}
>  my local env output is 
>  
>  
> {code:java}
> +I[null, sent, succeed, u, 2023-02-08] {code}
>  
>  my produtionc env output is 
> {code:java}
> +I[sent, sent, succeed, u, 2023-02-08]  {code}
> why?  
> This look like a bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata

2023-02-08 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686155#comment-17686155
 ] 

Shuiqiang Chen commented on FLINK-30922:


I have created a pull request to fixed the issue, Anyone who help review the PR 
will be highly appreciated.

> SQL validate fail in parsing writable metadata
> --
>
> Key: FLINK-30922
> URL: https://issues.apache.org/jira/browse/FLINK-30922
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
> ~[?:1.8.0_41]
>         at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
> 

[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata

2023-02-08 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685762#comment-17685762
 ] 

Shuiqiang Chen commented on FLINK-30922:


Hi [~tanjialiang], thank you for reporting the issue. I have reproduced the 
same error with the code you provided. 
According to 
[FLIP-107|https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors],
 it is possible to write metadata columns in SQL.

The cause of this error is a bug that it excludes all computed columns and 
metadata columns when doing appendPartitionAndNullsProjects in 
PreValidateReWriter. Actually, it is expected to include all persisted columns. 
I would like to fix it. 

> SQL validate fail in parsing writable metadata
> --
>
> Key: FLINK-30922
> URL: https://issues.apache.org/jira/browse/FLINK-30922
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> 

[jira] [Commented] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown

2023-02-01 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682985#comment-17682985
 ] 

Shuiqiang Chen commented on FLINK-30817:


[~luoyuxia] I would like to help fix the issue, please assign the jira ticket 
to me.

> ClassCastException in 
> TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
> ---
>
> Key: FLINK-30817
> URL: https://issues.apache.org/jira/browse/FLINK-30817
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Shuiqiang Chen
>Priority: Minor
>
> When applying partitions in 
> TestValuesScanTableSourceWithoutProjectionPushDown with no partition 
> provided, the following code will cause ClassCastException
> {code:java}
>  remainingPartitions = (List>) Collections.emptyMap();
>  this.data.put(Collections.emptyMap(), Collections.emptyList());
> {code}
> {code:java}
> java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast 
> to java.util.List
>   at 
> org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
>   at 
> org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown

2023-01-29 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-30817:
---
Description: 
When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown 
with no partition provided, the following code will cause ClassCastException

{code:java}
 remainingPartitions = (List>) Collections.emptyMap();
 this.data.put(Collections.emptyMap(), Collections.emptyList());
{code}


{code:java}
java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to 
java.util.List

at 
org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
at 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
at 
org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
{code}




  was:
When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown 
with no partition provided, the following code will cause ClassCastException

{code:java}
 remainingPartitions = (List>) Collections.emptyMap();
 this.data.put(Collections.emptyMap(), Collections.emptyList());
{code}


{panel:title=My title}
java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to 
java.util.List

at 
org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
at 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
at 
org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
{panel}




> ClassCastException in 
> TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
> ---
>
> Key: FLINK-30817
> URL: https://issues.apache.org/jira/browse/FLINK-30817
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Shuiqiang Chen
>Priority: Minor
>
> When applying partitions in 
> TestValuesScanTableSourceWithoutProjectionPushDown with no partition 
> provided, the following code will cause ClassCastException
> {code:java}
>  remainingPartitions = (List>) Collections.emptyMap();
>  this.data.put(Collections.emptyMap(), Collections.emptyList());
> {code}
> {code:java}
> java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast 
> to java.util.List
>   at 
> org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
>   at 
> org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown

2023-01-29 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-30817:
---
Priority: Minor  (was: Major)

> ClassCastException in 
> TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
> ---
>
> Key: FLINK-30817
> URL: https://issues.apache.org/jira/browse/FLINK-30817
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Shuiqiang Chen
>Priority: Minor
>
> When applying partitions in 
> TestValuesScanTableSourceWithoutProjectionPushDown with no partition 
> provided, the following code will cause ClassCastException
> {code:java}
>  remainingPartitions = (List>) Collections.emptyMap();
>  this.data.put(Collections.emptyMap(), Collections.emptyList());
> {code}
> {panel:title=My title}
> java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast 
> to java.util.List
>   at 
> org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
>   at 
> org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
> {panel}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30817) ClassCastException in TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown

2023-01-29 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-30817:
--

 Summary: ClassCastException in 
TestValuesTableFactory.TestValuesScanTableSourceWithoutProjectionPushDown
 Key: FLINK-30817
 URL: https://issues.apache.org/jira/browse/FLINK-30817
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0, 1.17.0
Reporter: Shuiqiang Chen


When applying partitions in TestValuesScanTableSourceWithoutProjectionPushDown 
with no partition provided, the following code will cause ClassCastException

{code:java}
 remainingPartitions = (List>) Collections.emptyMap();
 this.data.put(Collections.emptyMap(), Collections.emptyList());
{code}


{panel:title=My title}
java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to 
java.util.List

at 
org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown.applyPartitions(TestValuesTableFactory.java:1222)
at 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec.apply(PartitionPushDownSpec.java:57)
at 
org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:183)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:343)
{panel}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28882) ENCODE return error

2022-09-05 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556
 ] 

Shuiqiang Chen edited comment on FLINK-28882 at 9/6/22 3:00 AM:


I have encountered the same problem in latest master branch (1.17-SNAPSHOT).
 !screenshot-1.png! 
We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). 
However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to 
[FLINK-24419 |https://issues.apache.org/jira/browse/FLINK-24419]

{code:java}
/* Example generated code for BINARY(2):

// legacy behavior
((byte[])(inputValue))

// new behavior
byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : 
(java.util.Arrays.copyOf(((byte[])(inputValue)), 2)))

*/
{code}

The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the 
generation of CAST call.

There are two optional solutions:
1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in 
TableConfig when it is ENCODE call;
2. Make the output type of  `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be 
BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`.



was (Author: csq):
I have encountered the same problem in latest master branch (1.17-SNAPSHOT).
 !screenshot-1.png! 
We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). 
However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to 
[FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419]

{code:java}
/* Example generated code for BINARY(2):

// legacy behavior
((byte[])(inputValue))

// new behavior
byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : 
(java.util.Arrays.copyOf(((byte[])(inputValue)), 2)))

*/
{code}

The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the 
generation of CAST call.

There are two optional solutions:
1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in 
TableConfig when it is ENCODE call;
2. Make the output type of  `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be 
BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`.


> ENCODE return error
> ---
>
> Key: FLINK-28882
> URL: https://issues.apache.org/jira/browse/FLINK-28882
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Luning Wang
>Assignee: Shuiqiang Chen
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but 
> it returns 'kyuubi' in the 1.14 version.
> {code:java}
> select encode('kyuubi', 'UTF-8') {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28882) ENCODE return error

2022-09-05 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556
 ] 

Shuiqiang Chen edited comment on FLINK-28882 at 9/6/22 2:56 AM:


I have encountered the same problem in latest master branch (1.17-SNAPSHOT).
 !screenshot-1.png! 
We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). 
However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to 
[FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419]

{code:java}
/* Example generated code for BINARY(2):

// legacy behavior
((byte[])(inputValue))

// new behavior
byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : 
(java.util.Arrays.copyOf(((byte[])(inputValue)), 2)))

*/
{code}

The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the 
generation of CAST call.

There are two optional solutions:
1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLED in 
TableConfig when it is ENCODE call;
2. Make the output type of  `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be 
BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`.



was (Author: csq):
I have encountered the same problem in latest master branch (1.17-SNAPSHOT).
 !screenshot-1.png! 
We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). 
However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to 
[FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419]

{code:java}
/* Example generated code for BINARY(2):

// legacy behavior
((byte[])(inputValue))

// new behavior
byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : 
(java.util.Arrays.copyOf(((byte[])(inputValue)), 2)))

*/
{code}

The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the 
generation of cast call.

There are two optional solutions:
1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLE in 
TableConfig when it is ENCODE call;
2. Make the output type of  `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be 
BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`.


> ENCODE return error
> ---
>
> Key: FLINK-28882
> URL: https://issues.apache.org/jira/browse/FLINK-28882
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Luning Wang
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but 
> it returns 'kyuubi' in the 1.14 version.
> {code:java}
> select encode('kyuubi', 'UTF-8') {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28882) ENCODE return error

2022-09-05 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600556#comment-17600556
 ] 

Shuiqiang Chen commented on FLINK-28882:


I have encountered the same problem in latest master branch (1.17-SNAPSHOT).
 !screenshot-1.png! 
We can see that the result type of Encode('kyuubi', 'UTF-8') call is BINARY(1). 
However, the type of RexLiteral `X'6b7975756269'` is BINARY(6)). According to 
[FLINK-24419 title|https://issues.apache.org/jira/browse/FLINK-24419]

{code:java}
/* Example generated code for BINARY(2):

// legacy behavior
((byte[])(inputValue))

// new behavior
byte[])(inputValue)).length == 2) ? (((byte[])(inputValue))) : 
(java.util.Arrays.copyOf(((byte[])(inputValue)), 2)))

*/
{code}

The BINARY(6) is trimmed to BINARY(1) in BinaryToBinaryCastRule during the 
generation of cast call.

There are two optional solutions:
1. Apply the legacy cast behaviour with LegacyCastBehaviour.ENABLE in 
TableConfig when it is ENCODE call;
2. Make the output type of  `CAST(X'6b7975756269':BINARY(6)):BINARY(1)` to be 
BINARY(6) that equals to the type of RexLiteral `X'6b7975756269'`.


> ENCODE return error
> ---
>
> Key: FLINK-28882
> URL: https://issues.apache.org/jira/browse/FLINK-28882
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Luning Wang
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but 
> it returns 'kyuubi' in the 1.14 version.
> {code:java}
> select encode('kyuubi', 'UTF-8') {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28882) ENCODE return error

2022-09-05 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-28882:
---
Attachment: screenshot-1.png

> ENCODE return error
> ---
>
> Key: FLINK-28882
> URL: https://issues.apache.org/jira/browse/FLINK-28882
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Luning Wang
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but 
> it returns 'kyuubi' in the 1.14 version.
> {code:java}
> select encode('kyuubi', 'UTF-8') {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600199#comment-17600199
 ] 

Shuiqiang Chen commented on FLINK-28988:


Hi, [~jark], could you please help evaluate the solution and review the pr? 
Thanks!

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          

[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135
 ] 

Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:51 AM:


TLDR: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
  +- LogicalSnapshot(period=[$cor0.ts])
 +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
   +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
 +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]

[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135
 ] 

Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:29 AM:


TLDR: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
  +- LogicalSnapshot(period=[$cor0.ts])
 +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
   +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
 +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]

[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135
 ] 

Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:28 AM:


TLTD: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
  +- LogicalSnapshot(period=[$cor0.ts])
 +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
   +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
 +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]

[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135
 ] 

Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:20 AM:


TLTD: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
  +- LogicalSnapshot(period=[$cor0.ts])
 +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
   +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
 +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]

[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join

2022-09-04 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600135#comment-17600135
 ] 

Shuiqiang Chen commented on FLINK-28988:


TLTD: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
  +- LogicalSnapshot(period=[$cor0.ts])
 +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
+- LogicalFilter(condition=[=($3, 1)])
   +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
 +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :+- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
  +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
 +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
+- Exchange(distribution=[hash[$0]])
   +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
 +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 

[jira] [Updated] (FLINK-29146) User set job configuration can not be retirieved from JobGraph and ExecutionGraph

2022-08-31 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-29146:
---
Description: 
Currently, when building an ExecutionGraph, it requires to set the job specific 
information (like job id, job name, job configuration, etc) and most of them 
are from JobGraph.But I find that the configuration in JobGraph is a new 
Configuration instance when the JobGraph is built, and it does not contain any 
user set configuration. As a result, we are not able retrieve the use specified 
job configuration in ExecutionGraph built from JobGraph during execution 
runtime.

BTW, in StreamExecutionEnvironment, it seems that job configurations that not 
contained in built-in options will be ignored when calling 
StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, 
it will be included when constructing a StreamExecutionEnvironment, which seems 
a bit inconsistent. Is it by design?

{code:java}
Configuration configuration = new Configuration();
// These configured string will take effect.
configuration.setString("k1", "v1");
configuration.setString("k2", "v2");
configuration.setString("k3", "v3");
configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

// These configured string will be ignored.
configuration.setString("k4", "v4");
configuration.setString("k5", "v5");
configuration.setString("k6", "v6");
env.configure(configuration);
{code}


  was:
Currently, when building an ExecutionGraph, it requires to set the job specific 
information (like job id, job name, job configuration, etc) and most of them 
are from JobGraph.But I find that the configuraiton in JobGraph is a new 
Configuration instance that does not contain any user set configuration. As a 
result, we are not able retrieve the use specified job configuration in 
ExecutionGraph built from JobGraph during runtime execution.

BTW, in StreamExecutionEnvironment, it seems that job configuraitons that not 
contained in built-in options will be igored when calling 
StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, 
it will be included when constructing a StreamExecutionEnvironment, which seems 
a bit inconsistent.

{code:java}
Configuration configuration = new Configuration();
// These configured string will take effect.
configuration.setString("k1", "v1");
configuration.setString("k2", "v2");
configuration.setString("k3", "v3");
configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

// These configured string will be ignored.
configuration.setString("k4", "v4");
configuration.setString("k5", "v5");
configuration.setString("k6", "v6");
env.configure(configuration);
{code}



> User set job configuration can not be retirieved from JobGraph and 
> ExecutionGraph
> -
>
> Key: FLINK-29146
> URL: https://issues.apache.org/jira/browse/FLINK-29146
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Shuiqiang Chen
>Priority: Major
>
> Currently, when building an ExecutionGraph, it requires to set the job 
> specific information (like job id, job name, job configuration, etc) and most 
> of them are from JobGraph.But I find that the configuration in JobGraph is a 
> new Configuration instance when the JobGraph is built, and it does not 
> contain any user set configuration. As a result, we are not able retrieve the 
> use specified job configuration in ExecutionGraph built from JobGraph during 
> execution runtime.
> BTW, in StreamExecutionEnvironment, it seems that job configurations that not 
> contained in built-in options will be ignored when calling 
> StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, 
> it will be included when constructing a StreamExecutionEnvironment, which 
> seems a bit inconsistent. Is it by design?
> {code:java}
> Configuration configuration = new Configuration();
> // These configured string will take effect.
> configuration.setString("k1", "v1");
> configuration.setString("k2", "v2");
> configuration.setString("k3", "v3");
> configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> // These configured string will be ignored.
> configuration.setString("k4", "v4");
> configuration.setString("k5", "v5");
> configuration.setString("k6", "v6");
> env.configure(configuration);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29146) User set job configuration can not be retirieved from JobGraph and ExecutionGraph

2022-08-30 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-29146:
--

 Summary: User set job configuration can not be retirieved from 
JobGraph and ExecutionGraph
 Key: FLINK-29146
 URL: https://issues.apache.org/jira/browse/FLINK-29146
 Project: Flink
  Issue Type: Bug
Reporter: Shuiqiang Chen


Currently, when building an ExecutionGraph, it requires to set the job specific 
information (like job id, job name, job configuration, etc) and most of them 
are from JobGraph.But I find that the configuraiton in JobGraph is a new 
Configuration instance that does not contain any user set configuration. As a 
result, we are not able retrieve the use specified job configuration in 
ExecutionGraph built from JobGraph during runtime execution.

BTW, in StreamExecutionEnvironment, it seems that job configuraitons that not 
contained in built-in options will be igored when calling 
StreamExecutionEnvironment.configure(ReadableConfig[, ClassLoader]). However, 
it will be included when constructing a StreamExecutionEnvironment, which seems 
a bit inconsistent.

{code:java}
Configuration configuration = new Configuration();
// These configured string will take effect.
configuration.setString("k1", "v1");
configuration.setString("k2", "v2");
configuration.setString("k3", "v3");
configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 30L);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

// These configured string will be ignored.
configuration.setString("k4", "v4");
configuration.setString("k5", "v5");
configuration.setString("k6", "v6");
env.configure(configuration);
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28988) Incorrect result for filter after temporal join

2022-08-29 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597255#comment-17597255
 ] 

Shuiqiang Chen commented on FLINK-28988:


[~xuannan] Thank you for reporting the issue. I have also reproduced the output 
with the code snippet you provided in the latest master branch.  I would like 
to have a deep dive into the issue.

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Priority: Major
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   

[jira] [Updated] (FLINK-28119) Update the document for Contribute Documentation

2022-06-19 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-28119:
---
Description: 
Following the [Contribute 
Documentation|https://flink.apache.org/contributing/contribute-documentation.html],
 it requires me to install Hugo when executing 
{code:sh}
cd docs
./build_docs.sh -p
{code}
.
And the web server is actually accessed by http://localhost:1313 

Maybe we should:
1. List the prerequisites before building the doc.
2. Correct the web server address.


  was:
Following the [Contribute 
Documentation|https://flink.apache.org/contributing/contribute-documentation.html],
 it requires me to install Hugo when executing 
{code:shell}
cd docs
./build_docs.sh -p
{code}
.
And the web server is actually accessed by http://localhost:1313 

Maybe we should:
1. List the prerequisites before building the doc.
2. Correct the web server address.



> Update the document for Contribute Documentation
> 
>
> Key: FLINK-28119
> URL: https://issues.apache.org/jira/browse/FLINK-28119
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.16.0
>Reporter: Shuiqiang Chen
>Priority: Minor
>
> Following the [Contribute 
> Documentation|https://flink.apache.org/contributing/contribute-documentation.html],
>  it requires me to install Hugo when executing 
> {code:sh}
> cd docs
> ./build_docs.sh -p
> {code}
> .
> And the web server is actually accessed by http://localhost:1313 
> Maybe we should:
> 1. List the prerequisites before building the doc.
> 2. Correct the web server address.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28119) Update the document for Contribute Documentation

2022-06-19 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-28119:
--

 Summary: Update the document for Contribute Documentation
 Key: FLINK-28119
 URL: https://issues.apache.org/jira/browse/FLINK-28119
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.16.0
Reporter: Shuiqiang Chen


Following the [Contribute 
Documentation|https://flink.apache.org/contributing/contribute-documentation.html],
 it requires me to install Hugo when executing 
{code:shell}
cd docs
./build_docs.sh -p
{code}
.
And the web server is actually accessed by http://localhost:1313 

Maybe we should:
1. List the prerequisites before building the doc.
2. Correct the web server address.




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API

2022-06-14 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554388#comment-17554388
 ] 

Shuiqiang Chen commented on FLINK-27159:


[~dianfu]Yeah, I would like to make the builtin functions in Table API aligned 
with SQL to improve the user experience.

> Support first_value/last_value in the Table API
> ---
>
> Key: FLINK-27159
> URL: https://issues.apache.org/jira/browse/FLINK-27159
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Huang Xingbo
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, first_value/last_value are not supported in Table API.
> {code:python}
> table.group_by(col("a"))
>.select(
>   col("a"),
>   call("FIRST_VALUE", col("b")))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API

2022-06-14 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554076#comment-17554076
 ] 

Shuiqiang Chen commented on FLINK-27159:


[~dianfu] Thank you for your attention. I have created a PR for this ticket, 
would you mind helping to review it?

> Support first_value/last_value in the Table API
> ---
>
> Key: FLINK-27159
> URL: https://issues.apache.org/jira/browse/FLINK-27159
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Huang Xingbo
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, first_value/last_value are not supported in Table API.
> {code:python}
> table.group_by(col("a"))
>.select(
>   col("a"),
>   call("FIRST_VALUE", col("b")))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27881) The key(String) in PulsarMessageBuilder returns null

2022-06-02 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-27881:
--

 Summary: The key(String) in PulsarMessageBuilder returns null
 Key: FLINK-27881
 URL: https://issues.apache.org/jira/browse/FLINK-27881
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.0
Reporter: Shuiqiang Chen


The PulsarMessageBuild.key(String) always return null, which might cause NPE in 
later call.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27159) Support first_value/last_value in the Table API

2022-04-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524188#comment-17524188
 ] 

Shuiqiang Chen commented on FLINK-27159:


Hi [~hxbks2ks], I'm interested in this ticket, could you please assign it to me?

> Support first_value/last_value in the Table API
> ---
>
> Key: FLINK-27159
> URL: https://issues.apache.org/jira/browse/FLINK-27159
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Huang Xingbo
>Priority: Major
>
> Currently, first_value/last_value are not supported in Table API.
> {code:python}
> table.group_by(col("a"))
>.select(
>   col("a"),
>   call("FIRST_VALUE", col("b")))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25231) Update PyFlink to use the new type system

2022-02-15 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492438#comment-17492438
 ] 

Shuiqiang Chen commented on FLINK-25231:


[~dianfu] Sorry for the late reply.  This issue is currently in an almost done 
state in my local developing branch. But there are some test case failures due 
to the unexpected converted TypeInfo When applying the  
TypeConversion.fromDataTypeToLegacyInfo(), I'm trying to find out a solution to 
prevent the error as soon as I can. 

> Update PyFlink to use the new type system
> -
>
> Key: FLINK-25231
> URL: https://issues.apache.org/jira/browse/FLINK-25231
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Shuiqiang Chen
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, there are a lot of places in PyFlink Table API still using the 
> legacy type system. We need to revisit this and migrate them to the new type 
> system(DataType/LogicalType).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25231) Update PyFlink to use the new type system

2021-12-14 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459611#comment-17459611
 ] 

Shuiqiang Chen commented on FLINK-25231:


[~dianfu] Thank you for reporting this improvement. I would like to take on 
this task, please assign it to me.

> Update PyFlink to use the new type system
> -
>
> Key: FLINK-25231
> URL: https://issues.apache.org/jira/browse/FLINK-25231
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, there are a lot of places in PyFlink Table API still using the 
> legacy type system. We need to revisit this and migrate them to the new type 
> system(DataType/LogicalType).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-21966) Support Kinesis connector in Python DataStream API.

2021-03-24 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-21966:
--

 Summary: Support Kinesis connector in Python DataStream API.
 Key: FLINK-21966
 URL: https://issues.apache.org/jira/browse/FLINK-21966
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21559) Python DataStreamTests::test_process_function failed on AZP

2021-03-03 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17295025#comment-17295025
 ] 

Shuiqiang Chen commented on FLINK-21559:


Thank you for reporting the issue, I am looking at it.

> Python DataStreamTests::test_process_function failed on AZP
> ---
>
> Key: FLINK-21559
> URL: https://issues.apache.org/jira/browse/FLINK-21559
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> The Python test case {{DataStreamTests::test_process_function}} failed on AZP.
> {code}
> === short test summary info 
> 
> FAILED 
> pyflink/datastream/tests/test_data_stream.py::DataStreamTests::test_process_function
> = 1 failed, 705 passed, 22 skipped, 303 warnings in 583.39s (0:09:43) 
> ==
> ERROR: InvocationError for command /__w/3/s/flink-python/.tox/py38/bin/pytest 
> --durations=20 (exited with code 1)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13992=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21113) Add State access API for KeyedStream RuntimeContext.

2021-01-24 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-21113:
--

 Summary: Add State access API for  KeyedStream RuntimeContext.
 Key: FLINK-21113
 URL: https://issues.apache.org/jira/browse/FLINK-21113
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuiqiang Chen


Add  state access API for  KeyedStream RuntimeContext so that users can get a 
ValueState/ListState/MapState via RuntimeContext.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21111) Support State access in Python DataStream API

2021-01-24 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-2:
---
Affects Version/s: (was: 1.13.0)

> Support State access in Python DataStream API
> -
>
> Key: FLINK-2
> URL: https://issues.apache.org/jira/browse/FLINK-2
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.13.0
>
>
> This is the umbrella Jira for FLIP-153, which intends to support state access 
> in Python DataSteam API.
> FLIP wiki page: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> In FLIP-130, we have already supported Python DataStream stateless API so 
> that users are able to perform some basic data transformations. To implement 
> more complex data processing, we need to provide state access support. In 
> this doc, I would propose to add support of state access for Python 
> DataStream API to support stateful operations on a KeyedStream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21111) Support State access in Python DataStream API

2021-01-24 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-2:
---
Fix Version/s: 1.13.0

> Support State access in Python DataStream API
> -
>
> Key: FLINK-2
> URL: https://issues.apache.org/jira/browse/FLINK-2
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.13.0
>
>
> This is the umbrella Jira for FLIP-153, which intends to support state access 
> in Python DataSteam API.
> FLIP wiki page: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> In FLIP-130, we have already supported Python DataStream stateless API so 
> that users are able to perform some basic data transformations. To implement 
> more complex data processing, we need to provide state access support. In 
> this doc, I would propose to add support of state access for Python 
> DataStream API to support stateful operations on a KeyedStream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21112) Add ValueState/ListState/MapState and corresponding StateDescriptors for Python DataStream API

2021-01-24 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-21112:
--

 Summary: Add ValueState/ListState/MapState and corresponding 
StateDescriptors for Python DataStream API
 Key: FLINK-21112
 URL: https://issues.apache.org/jira/browse/FLINK-21112
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21111) Support State access in Python DataStream API

2021-01-24 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-2:
--

 Summary: Support State access in Python DataStream API
 Key: FLINK-2
 URL: https://issues.apache.org/jira/browse/FLINK-2
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Shuiqiang Chen


This is the umbrella Jira for FLIP-153, which intends to support state access 
in Python DataSteam API.

FLIP wiki page: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API

In FLIP-130, we have already supported Python DataStream stateless API so that 
users are able to perform some basic data transformations. To implement more 
complex data processing, we need to provide state access support. In this doc, 
I would propose to add support of state access for Python DataStream API to 
support stateful operations on a KeyedStream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20720) Add documentation for ProcessFunction in Python DataStream API

2020-12-22 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20720:
--

 Summary: Add documentation for ProcessFunction in Python 
DataStream API
 Key: FLINK-20720
 URL: https://issues.apache.org/jira/browse/FLINK-20720
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Shuiqiang Chen
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20715) pyflink DataStream filter error.

2020-12-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253388#comment-17253388
 ] 

Shuiqiang Chen commented on FLINK-20715:


Hi [~kingreatwill], thank you for reporting the issue, is it the full stack 
trace of the error above? I have tested the code snippet you gave and it 
finished successfully.

> pyflink DataStream filter error.
> 
>
> Key: FLINK-20715
> URL: https://issues.apache.org/jira/browse/FLINK-20715
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python
>Affects Versions: 1.12.0
> Environment: Flink 1.12
>Reporter: Enter
>Priority: Major
>
> ```
>  class MyFilterFunction(FilterFunction):
> def filter(self, value):
>  return value[0] % 2 == 0
> def demo_stream():
>  see = StreamExecutionEnvironment.get_execution_environment()
>  see.set_parallelism(1)
>  ds = see.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
>  type_info=Types.ROW(
>  [Types.INT(), Types.STRING(), Types.STRING()])
>  )
>  ds.filter(MyFilterFunction()).print()
>  ds.print()
>  # 执行任务;
>  see.execute('job1')
> if __name__ == '__main__':
>  demo_stream()
>  ```
>  
>     raise Py4JError(    raise Py4JError(py4j.protocol.Py4JError: An error 
> occurred while calling o0.__getstate__. 
> Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
> __getstate__([]) does not exist at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
>  at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
>  at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) 
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20647) Use yield to generate output datas in ProcessFunction for Python DataStream

2020-12-17 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20647:
--

 Summary: Use yield to generate output datas in ProcessFunction for 
Python DataStream
 Key: FLINK-20647
 URL: https://issues.apache.org/jira/browse/FLINK-20647
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.13.0


Currently, users need to use a `collector` to collect the output of their 
ProcessFunction. We should use yield to make it more efficient that avoid using 
an array buffer to store the collected data, and it would be more pythonic as 
well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20621) Refactor the TypeInformation implementation in Python DataStream API

2020-12-16 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20621:
--

 Summary: Refactor the TypeInformation implementation in Python 
DataStream API
 Key: FLINK-20621
 URL: https://issues.apache.org/jira/browse/FLINK-20621
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.12.0


Currently, when users declare a TypeInfo in Python DataStream job it will 
create a Java TypeInfo instance when instantiating. We should make it in a lazy 
style that creates the Java TypeInfo instance only when calling 
TypeInformation.get_java_type_info().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20123) Test native support of PyFlink on Kubernetes

2020-11-25 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070
 ] 

Shuiqiang Chen edited comment on FLINK-20123 at 11/26/20, 5:27 AM:
---

Hi [~rmetzger] I found that the native k8s cluster could not be unregistered 
when executing Python DataStream application in attach mode during test. I have 
created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and 
will fix it today.


was (Author: csq):
Hi [~rmetzger] I found that the native k8s cluster could not be unregistered 
when executing Python DataStream application in attach mode during test. I have 
created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and 
will fixed it today.

> Test native support of PyFlink on Kubernetes
> 
>
> Key: FLINK-20123
> URL: https://issues.apache.org/jira/browse/FLINK-20123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20123) Test native support of PyFlink on Kubernetes

2020-11-25 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070
 ] 

Shuiqiang Chen edited comment on FLINK-20123 at 11/26/20, 5:27 AM:
---

Hi [~rmetzger] I found that the native k8s cluster could not be unregistered 
when executing Python DataStream application in attach mode during test. I have 
created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for it and 
will fixed it today.


was (Author: csq):
Hi [~rmetzger] I found that the native k8s cluster could not be unregistered 
when executing Python DataStream application in attach mode during test. I have 
created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for and will 
fixed it today.

> Test native support of PyFlink on Kubernetes
> 
>
> Key: FLINK-20123
> URL: https://issues.apache.org/jira/browse/FLINK-20123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.

2020-11-25 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20365:
---
Affects Version/s: (was: 1.12.0)

> The native k8s cluster could not be unregistered when executing Python 
> DataStream application attachedly.
> -
>
> Key: FLINK-20365
> URL: https://issues.apache.org/jira/browse/FLINK-20365
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.

2020-11-25 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20365:
---
Fix Version/s: 1.12.0

> The native k8s cluster could not be unregistered when executing Python 
> DataStream application attachedly.
> -
>
> Key: FLINK-20365
> URL: https://issues.apache.org/jira/browse/FLINK-20365
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20123) Test native support of PyFlink on Kubernetes

2020-11-25 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239070#comment-17239070
 ] 

Shuiqiang Chen commented on FLINK-20123:


Hi [~rmetzger] I found that the native k8s cluster could not be unregistered 
when executing Python DataStream application in attach mode during test. I have 
created a [JIRA|https://issues.apache.org/jira/browse/FLINK-20365] for and will 
fixed it today.

> Test native support of PyFlink on Kubernetes
> 
>
> Key: FLINK-20123
> URL: https://issues.apache.org/jira/browse/FLINK-20123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20365) The native k8s cluster could not be unregistered when executing Python DataStream application attachedly.

2020-11-25 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20365:
--

 Summary: The native k8s cluster could not be unregistered when 
executing Python DataStream application attachedly.
 Key: FLINK-20365
 URL: https://issues.apache.org/jira/browse/FLINK-20365
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Shuiqiang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20199) PyFlink e2e DataStream test failed during stopping kafka services

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237087#comment-17237087
 ] 

Shuiqiang Chen edited comment on FLINK-20199 at 11/23/20, 3:38 AM:
---

[~hxbks2ks] Thank you for reporting this issue. It seems that it failed to stop 
the Kafka process after the DataStream job finished. I'll keep an eye on this 
problem.


was (Author: csq):
[~hxbks2ks] Thank you for reporting this issue. It is because the testing 
environment's instability that it fail to stop the Kafka process after the 
DataStream job finished.

> PyFlink e2e DataStream test failed during stopping kafka services
> -
>
> Key: FLINK-20199
> URL: https://issues.apache.org/jira/browse/FLINK-20199
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]
>  
> {code}
> Nov 17 06:04:35 Reading kafka messages... 
> Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host 
> fv-az598-520. 
> Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host 
> fv-az598-520. 
> Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 
> 'kafka' 
> Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 
> 'kafka'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237090#comment-17237090
 ] 

Shuiqiang Chen edited comment on FLINK-19951 at 11/23/20, 3:32 AM:
---

Hi everyone, as illustrated in the comments above, I have added a timeout 
limitation for reading output result from Kafka so that it won't stuck forever 
as part of [this pr|https://github.com/apache/flink/pull/14068]. And it will 
print logs for JM and TM for debug purpose if the read data is not as expected. 

Will revisit this issue when it fails again (with more logs).

Regarding to the instance reported by [~xintongsong], I think it's not related 
to this issue and is already tracked in FLINK-20199.


was (Author: csq):
Hi everyone, as illustrated in the comments above, I have added a timeout 
limitation for reading output result from Kafka so that it won't stuck forever. 
And it will print logs for JM and TM for debug purpose if the read data is not 
as expected. Please refer to [this 
pr|https://github.com/apache/flink/pull/14068]

Overall, to figure out the root cause, we need to observe for a little while 
with more debug information for test failures.

> PyFlink end-to-end test stuck in "Reading kafka messages"
> -
>
> Key: FLINK-19951
> URL: https://issues.apache.org/jira/browse/FLINK-19951
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job:
> 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka...
> 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from 
> https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
> 2020-11-03T08:18:10.3024006Z   % Total% Received % Xferd  Average Speed   
> TimeTime Time  Current
> 2020-11-03T08:18:10.3024610Z  Dload  Upload   
> Total   SpentLeft  Speed
> 2020-11-03T08:18:10.3024891Z 
> 2020-11-03T08:18:10.6563956Z   0 00 00 0  0  0 
> --:--:-- --:--:-- --:--:-- 0
> 2020-11-03T08:18:11.6568328Z   0 54.3M0 327680 0  92275  0  
> 0:10:18 --:--:--  0:10:18 92044
> 2020-11-03T08:18:12.6540430Z  11 54.3M   11 6272k0 0  4626k  0  
> 0:00:12  0:00:01  0:00:11 4625k
> 2020-11-03T08:18:13.6585146Z  23 54.3M   23 12.6M0 0  5521k  0  
> 0:00:10  0:00:02  0:00:08 5521k
> 2020-11-03T08:18:14.6558377Z  36 54.3M   36 19.7M0 0  6018k  0  
> 0:00:09  0:00:03  0:00:06 6017k
> 2020-11-03T08:18:15.6593118Z  49 54.3M   49 26.7M0 0  6297k  0  
> 0:00:08  0:00:04  0:00:04 6297k
> 2020-11-03T08:18:16.653Z  62 54.3M   62 34.0M0 0  6515k  0  
> 0:00:08  0:00:05  0:00:03 6973k
> 2020-11-03T08:18:17.6544951Z  76 54.3M   76 41.8M0 0  6747k  0  
> 0:00:08  0:00:06  0:00:02 7322k
> 2020-11-03T08:18:18.2448109Z  91 54.3M   91 49.7M0 0  6923k  0  
> 0:00:08  0:00:07  0:00:01 7584k
> 2020-11-03T08:18:18.2450531Z 100 54.3M  100 54.3M0 0  7010k  0  
> 0:00:07  0:00:07 --:--:-- 7737k
> 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been 
> started ...
> 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker...
> 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic 
> test-python-data-stream-source.
> 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka...
> 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic 
> test-python-data-stream-sink.
> 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with 
> JobID 1b0c317b47c69ee600937e1715ad9cce
> 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages...
> 2020-11-03T08:53:10.5246998Z 
> ==
> 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of 
> the allocated time budget of 250 minutes ===
> 2020-11-03T08:53:10.5251343Z 
> ==
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237090#comment-17237090
 ] 

Shuiqiang Chen commented on FLINK-19951:


Hi everyone, as illustrated in the comments above, I have added a timeout 
limitation for reading output result from Kafka so that it won't stuck forever. 
And it will print logs for JM and TM for debug purpose if the read data is not 
as expected. Please refer to [this 
pr|https://github.com/apache/flink/pull/14068]

Overall, to figure out the root cause, we need to observe for a little while 
with more debug information for test failures.

> PyFlink end-to-end test stuck in "Reading kafka messages"
> -
>
> Key: FLINK-19951
> URL: https://issues.apache.org/jira/browse/FLINK-19951
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job:
> 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka...
> 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from 
> https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
> 2020-11-03T08:18:10.3024006Z   % Total% Received % Xferd  Average Speed   
> TimeTime Time  Current
> 2020-11-03T08:18:10.3024610Z  Dload  Upload   
> Total   SpentLeft  Speed
> 2020-11-03T08:18:10.3024891Z 
> 2020-11-03T08:18:10.6563956Z   0 00 00 0  0  0 
> --:--:-- --:--:-- --:--:-- 0
> 2020-11-03T08:18:11.6568328Z   0 54.3M0 327680 0  92275  0  
> 0:10:18 --:--:--  0:10:18 92044
> 2020-11-03T08:18:12.6540430Z  11 54.3M   11 6272k0 0  4626k  0  
> 0:00:12  0:00:01  0:00:11 4625k
> 2020-11-03T08:18:13.6585146Z  23 54.3M   23 12.6M0 0  5521k  0  
> 0:00:10  0:00:02  0:00:08 5521k
> 2020-11-03T08:18:14.6558377Z  36 54.3M   36 19.7M0 0  6018k  0  
> 0:00:09  0:00:03  0:00:06 6017k
> 2020-11-03T08:18:15.6593118Z  49 54.3M   49 26.7M0 0  6297k  0  
> 0:00:08  0:00:04  0:00:04 6297k
> 2020-11-03T08:18:16.653Z  62 54.3M   62 34.0M0 0  6515k  0  
> 0:00:08  0:00:05  0:00:03 6973k
> 2020-11-03T08:18:17.6544951Z  76 54.3M   76 41.8M0 0  6747k  0  
> 0:00:08  0:00:06  0:00:02 7322k
> 2020-11-03T08:18:18.2448109Z  91 54.3M   91 49.7M0 0  6923k  0  
> 0:00:08  0:00:07  0:00:01 7584k
> 2020-11-03T08:18:18.2450531Z 100 54.3M  100 54.3M0 0  7010k  0  
> 0:00:07  0:00:07 --:--:-- 7737k
> 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been 
> started ...
> 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker...
> 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic 
> test-python-data-stream-source.
> 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka...
> 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic 
> test-python-data-stream-sink.
> 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with 
> JobID 1b0c317b47c69ee600937e1715ad9cce
> 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages...
> 2020-11-03T08:53:10.5246998Z 
> ==
> 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of 
> the allocated time budget of 250 minutes ===
> 2020-11-03T08:53:10.5251343Z 
> ==
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20199) PyFlink unstable e2e DataStream test

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237087#comment-17237087
 ] 

Shuiqiang Chen commented on FLINK-20199:


[~hxbks2ks] Thank you for reporting this issue. It is because the testing 
environment's instability that it fail to stop the Kafka process after the 
DataStream job finished.

> PyFlink unstable e2e DataStream test
> 
>
> Key: FLINK-20199
> URL: https://issues.apache.org/jira/browse/FLINK-20199
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]
>  
> {code}
> Nov 17 06:04:35 Reading kafka messages... 
> Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host 
> fv-az598-520. 
> Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host 
> fv-az598-520. 
> Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 
> 'kafka' 
> Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 
> 'kafka'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20199) PyFlink unstable e2e DataStream test

2020-11-22 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20199:
---
Description: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]

 

{code}
Nov 17 06:04:35 Reading kafka messages... 
Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. 
Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. 
Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host 
fv-az598-520. 
Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host 
fv-az598-520. 
Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 'kafka' 
Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 'kafka'
{code}

  
was:https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529


> PyFlink unstable e2e DataStream test
> 
>
> Key: FLINK-20199
> URL: https://issues.apache.org/jira/browse/FLINK-20199
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]
>  
> {code}
> Nov 17 06:04:35 Reading kafka messages... 
> Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host 
> fv-az598-520. 
> Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host 
> fv-az598-520. 
> Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 
> 'kafka' 
> Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 
> 'kafka'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-20199) PyFlink unstable e2e DataStream test

2020-11-22 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20199:
---
Comment: was deleted

(was: Thank you for reporting the issue. It is related to [this 
ticket|https://issues.apache.org/jira/browse/FLINK-19951])

> PyFlink unstable e2e DataStream test
> 
>
> Key: FLINK-20199
> URL: https://issues.apache.org/jira/browse/FLINK-20199
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]
>  
> {code}
> Nov 17 06:04:35 Reading kafka messages... 
> Nov 17 06:05:17 Cancelling job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:17 Cancelled job 55acfb14436b24b9c6fb0f47d7297f83. 
> Nov 17 06:05:18 Stopping taskexecutor daemon (pid: 110438) on host 
> fv-az598-520. 
> Nov 17 06:05:24 Stopping standalonesession daemon (pid: 110130) on host 
> fv-az598-520. 
> Nov 17 06:05:25 Waiting till process is stopped: pid = 123127 pattern = 
> 'kafka' 
> Nov 17 06:05:26 Waiting till process is stopped: pid = 123717 pattern = 
> 'kafka'
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20199) PyFlink unstable e2e DataStream test

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237067#comment-17237067
 ] 

Shuiqiang Chen commented on FLINK-20199:


Thank you for reporting the issue. It is related to [this 
ticket|https://issues.apache.org/jira/browse/FLINK-19951]

> PyFlink unstable e2e DataStream test
> 
>
> Key: FLINK-20199
> URL: https://issues.apache.org/jira/browse/FLINK-20199
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9667=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20135) Test Python DataStream API

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237055#comment-17237055
 ] 

Shuiqiang Chen edited comment on FLINK-20135 at 11/23/20, 1:40 AM:
---

The following APIs have been tested, and most of them are functioning well.
|rebalance|
|forward|
|broadcast|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|
|StreamExecutionEnvironment|
|from_collection|
|execute_async|
|add_source|
|add_python_file|
|set_python_requirement|
|set_python_archive|
|set_python_executable|

These two APIs do not work as expected:
|add_jars|
|add_classpaths|

I have created Jira and pr for it: 

https://issues.apache.org/jira/browse/FLINK-20275


was (Author: csq):
The following APIs have been tested, and most of them are functioning well.
|rebalance|
|forward|
|broadcast|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

|StreamExecutionEnvironment|
|from_collection|
|execute_async|
|add_source|
|add_python_file|
|set_python_requirement|
|set_python_archive|
|set_python_executable|

These two APIs do not work as expected:
|add_jars|
|add_classpaths|

I have created Jira and pr repectively: 

https://issues.apache.org/jira/browse/FLINK-20275

> Test Python DataStream API
> --
>
> Key: FLINK-20135
> URL: https://issues.apache.org/jira/browse/FLINK-20135
> Project: Flink
>  Issue Type: Task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20135) Test Python DataStream API

2020-11-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237055#comment-17237055
 ] 

Shuiqiang Chen commented on FLINK-20135:


The following APIs have been tested, and most of them are functioning well.
|rebalance|
|forward|
|broadcast|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

|StreamExecutionEnvironment|
|from_collection|
|execute_async|
|add_source|
|add_python_file|
|set_python_requirement|
|set_python_archive|
|set_python_executable|

These two APIs do not work as expected:
|add_jars|
|add_classpaths|

I have created Jira and pr repectively: 

https://issues.apache.org/jira/browse/FLINK-20275

> Test Python DataStream API
> --
>
> Key: FLINK-20135
> URL: https://issues.apache.org/jira/browse/FLINK-20135
> Project: Flink
>  Issue Type: Task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20275) The path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment should be ;

2020-11-22 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20275:
---
Component/s: API / Python

> The path delimiter for add_jars and add_classpaths in Python 
> StreamExecutionEnvironment should be ; 
> 
>
> Key: FLINK-20275
> URL: https://issues.apache.org/jira/browse/FLINK-20275
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Currently, the path delimiter for add_jars and add_classpaths in Python 
> StreamExecutionEnvironment is ",", this would cause the rest client fail to 
> upload the specified jars and stuck forever without errors. It should be ";" 
> instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20275) The path delimiter for add_jars and add_classpaths in Python StreamExecutionEnvironment should be ;

2020-11-22 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20275:
--

 Summary: The path delimiter for add_jars and add_classpaths in 
Python StreamExecutionEnvironment should be ; 
 Key: FLINK-20275
 URL: https://issues.apache.org/jira/browse/FLINK-20275
 Project: Flink
  Issue Type: Bug
Reporter: Shuiqiang Chen
 Fix For: 1.12.0


Currently, the path delimiter for add_jars and add_classpaths in Python 
StreamExecutionEnvironment is ",", this would cause the rest client fail to 
upload the specified jars and stuck forever without errors. It should be ";" 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20135) Test Python DataStream API

2020-11-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235826#comment-17235826
 ] 

Shuiqiang Chen edited comment on FLINK-20135 at 11/20/20, 1:50 AM:
---

Thank you for creating this ticket. 

I have finished testing the following APIs for Python DataStream API, all of 
them work as expected under normal condition and throw exceptions with invalid 
inputs:

 
|get_name|
|name|
|uid|
|set_uid_hash|
|set_parallelism|
|set_max_parallelism|
|get_type|
|get_execution_environment|
|get_execution_config|
|force_non_parallel|
|set_buffer_timeout|
|start_new_chain|
|disable_chaining|
|slot_sharing_group|
|map|
|flat_map|
|key_by|
|filter|
|union|
|connect|
|shuffle|
|project|
|rescale|
|broadcast|
|print|

I would test the following API as soon as possible:
|rebalance|
|forward|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

All these APIs have documented respectively.


was (Author: csq):
Thank you for creating this ticket. 

I have finished testing the following APIs for Python DataStream API, all of 
them work as expected under normal condition and throw exceptions with invalid 
inputs:

 
|get_name|
|name|
|uid|
|set_uid_hash|
|set_parallelism|
|set_max_parallelism|
|get_type|
|get_execution_environment|
|get_execution_config|
|force_non_parallel|
|set_buffer_timeout|
|start_new_chain|
|disable_chaining|
|slot_sharing_group|
|map|
|flat_map|
|key_by|
|filter|
|union|
|connect|
|shuffle|
|project|
|rescale|
|broadcast|
|print|

I would test the following API as soon as possible:
|rebalance|
|forward|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

All these APIs have documented respectively.
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13340263]

> Test Python DataStream API
> --
>
> Key: FLINK-20135
> URL: https://issues.apache.org/jira/browse/FLINK-20135
> Project: Flink
>  Issue Type: Task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-20123) Test native support of PyFlink on Kubernetes

2020-11-19 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20123:
---
Comment: was deleted

(was: Thank you for creating this ticket. 

I have finished testing the following APIs for Python DataStream API, all of 
them work as expected under normal condition and throw exceptions with invalid 
inputs:

 
|get_name|
|name|
|uid|
|set_uid_hash|
|set_parallelism|
|set_max_parallelism|
|get_type|
|get_execution_environment|
|get_execution_config|
|force_non_parallel|
|set_buffer_timeout|
|start_new_chain|
|disable_chaining|
|slot_sharing_group|
|map|
|flat_map|
|key_by|
|filter|
|union|
|connect|
|shuffle|
|project|
|rescale|
|broadcast|
|print|

I would test the following API as soon as possible:

|rebalance|
|forward|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

All these APIs have documented respectively.)

> Test native support of PyFlink on Kubernetes
> 
>
> Key: FLINK-20123
> URL: https://issues.apache.org/jira/browse/FLINK-20123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20135) Test Python DataStream API

2020-11-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235826#comment-17235826
 ] 

Shuiqiang Chen commented on FLINK-20135:


Thank you for creating this ticket. 

I have finished testing the following APIs for Python DataStream API, all of 
them work as expected under normal condition and throw exceptions with invalid 
inputs:

 
|get_name|
|name|
|uid|
|set_uid_hash|
|set_parallelism|
|set_max_parallelism|
|get_type|
|get_execution_environment|
|get_execution_config|
|force_non_parallel|
|set_buffer_timeout|
|start_new_chain|
|disable_chaining|
|slot_sharing_group|
|map|
|flat_map|
|key_by|
|filter|
|union|
|connect|
|shuffle|
|project|
|rescale|
|broadcast|
|print|

I would test the following API as soon as possible:
|rebalance|
|forward|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

All these APIs have documented respectively.
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13340263]

> Test Python DataStream API
> --
>
> Key: FLINK-20135
> URL: https://issues.apache.org/jira/browse/FLINK-20135
> Project: Flink
>  Issue Type: Task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20123) Test native support of PyFlink on Kubernetes

2020-11-19 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235491#comment-17235491
 ] 

Shuiqiang Chen commented on FLINK-20123:


Thank you for creating this ticket. 

I have finished testing the following APIs for Python DataStream API, all of 
them work as expected under normal condition and throw exceptions with invalid 
inputs:

 
|get_name|
|name|
|uid|
|set_uid_hash|
|set_parallelism|
|set_max_parallelism|
|get_type|
|get_execution_environment|
|get_execution_config|
|force_non_parallel|
|set_buffer_timeout|
|start_new_chain|
|disable_chaining|
|slot_sharing_group|
|map|
|flat_map|
|key_by|
|filter|
|union|
|connect|
|shuffle|
|project|
|rescale|
|broadcast|
|print|

I would test the following API as soon as possible:

|rebalance|
|forward|
|process|
|assign_timestamp_and_watermark|
|partition_custom|
|add_sink|

All these APIs have documented respectively.

> Test native support of PyFlink on Kubernetes
> 
>
> Key: FLINK-20123
> URL: https://issues.apache.org/jira/browse/FLINK-20123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Shuiqiang Chen
>Priority: Critical
> Fix For: 1.12.0
>
>
> 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"

2020-11-16 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233190#comment-17233190
 ] 

Shuiqiang Chen edited comment on FLINK-19951 at 11/17/20, 1:58 AM:
---

Hi [~dian.fu] [~rmetzger] thank you for reporting this issue. It's because that 
the Kafka consumer could not read the expected number of messages(15), so it 
would stuck here. The root cause might be the Flink job is not running 
normally. Maybe we could printing out the log of JM on exiting the test. Or 
reading the Kafka within a specific time. I will have a try as soon as possible.


was (Author: csq):
Hi [~dian.fu] it's because that the Kafka consumer could not read the expected 
number of messages(15), so it would stuck here. The root cause might be the 
Flink job is not running normally. Maybe we could printing out the log of JM on 
exiting the test.

> PyFlink end-to-end test stuck in "Reading kafka messages"
> -
>
> Key: FLINK-19951
> URL: https://issues.apache.org/jira/browse/FLINK-19951
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job:
> 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka...
> 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from 
> https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
> 2020-11-03T08:18:10.3024006Z   % Total% Received % Xferd  Average Speed   
> TimeTime Time  Current
> 2020-11-03T08:18:10.3024610Z  Dload  Upload   
> Total   SpentLeft  Speed
> 2020-11-03T08:18:10.3024891Z 
> 2020-11-03T08:18:10.6563956Z   0 00 00 0  0  0 
> --:--:-- --:--:-- --:--:-- 0
> 2020-11-03T08:18:11.6568328Z   0 54.3M0 327680 0  92275  0  
> 0:10:18 --:--:--  0:10:18 92044
> 2020-11-03T08:18:12.6540430Z  11 54.3M   11 6272k0 0  4626k  0  
> 0:00:12  0:00:01  0:00:11 4625k
> 2020-11-03T08:18:13.6585146Z  23 54.3M   23 12.6M0 0  5521k  0  
> 0:00:10  0:00:02  0:00:08 5521k
> 2020-11-03T08:18:14.6558377Z  36 54.3M   36 19.7M0 0  6018k  0  
> 0:00:09  0:00:03  0:00:06 6017k
> 2020-11-03T08:18:15.6593118Z  49 54.3M   49 26.7M0 0  6297k  0  
> 0:00:08  0:00:04  0:00:04 6297k
> 2020-11-03T08:18:16.653Z  62 54.3M   62 34.0M0 0  6515k  0  
> 0:00:08  0:00:05  0:00:03 6973k
> 2020-11-03T08:18:17.6544951Z  76 54.3M   76 41.8M0 0  6747k  0  
> 0:00:08  0:00:06  0:00:02 7322k
> 2020-11-03T08:18:18.2448109Z  91 54.3M   91 49.7M0 0  6923k  0  
> 0:00:08  0:00:07  0:00:01 7584k
> 2020-11-03T08:18:18.2450531Z 100 54.3M  100 54.3M0 0  7010k  0  
> 0:00:07  0:00:07 --:--:-- 7737k
> 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been 
> started ...
> 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker...
> 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic 
> test-python-data-stream-source.
> 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka...
> 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic 
> test-python-data-stream-sink.
> 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with 
> JobID 1b0c317b47c69ee600937e1715ad9cce
> 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages...
> 2020-11-03T08:53:10.5246998Z 
> ==
> 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of 
> the allocated time budget of 250 minutes ===
> 2020-11-03T08:53:10.5251343Z 
> ==
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19951) PyFlink end-to-end test stuck in "Reading kafka messages"

2020-11-16 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233190#comment-17233190
 ] 

Shuiqiang Chen commented on FLINK-19951:


Hi [~dian.fu] it's because that the Kafka consumer could not read the expected 
number of messages(15), so it would stuck here. The root cause might be the 
Flink job is not running normally. Maybe we could printing out the log of JM on 
exiting the test.

> PyFlink end-to-end test stuck in "Reading kafka messages"
> -
>
> Key: FLINK-19951
> URL: https://issues.apache.org/jira/browse/FLINK-19951
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8837=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-11-03T08:18:10.2935249Z Nov 03 08:18:10 Test PyFlink DataStream job:
> 2020-11-03T08:18:10.2936216Z Nov 03 08:18:10 Preparing Kafka...
> 2020-11-03T08:18:10.2948091Z Nov 03 08:18:10 Downloading Kafka from 
> https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
> 2020-11-03T08:18:10.3024006Z   % Total% Received % Xferd  Average Speed   
> TimeTime Time  Current
> 2020-11-03T08:18:10.3024610Z  Dload  Upload   
> Total   SpentLeft  Speed
> 2020-11-03T08:18:10.3024891Z 
> 2020-11-03T08:18:10.6563956Z   0 00 00 0  0  0 
> --:--:-- --:--:-- --:--:-- 0
> 2020-11-03T08:18:11.6568328Z   0 54.3M0 327680 0  92275  0  
> 0:10:18 --:--:--  0:10:18 92044
> 2020-11-03T08:18:12.6540430Z  11 54.3M   11 6272k0 0  4626k  0  
> 0:00:12  0:00:01  0:00:11 4625k
> 2020-11-03T08:18:13.6585146Z  23 54.3M   23 12.6M0 0  5521k  0  
> 0:00:10  0:00:02  0:00:08 5521k
> 2020-11-03T08:18:14.6558377Z  36 54.3M   36 19.7M0 0  6018k  0  
> 0:00:09  0:00:03  0:00:06 6017k
> 2020-11-03T08:18:15.6593118Z  49 54.3M   49 26.7M0 0  6297k  0  
> 0:00:08  0:00:04  0:00:04 6297k
> 2020-11-03T08:18:16.653Z  62 54.3M   62 34.0M0 0  6515k  0  
> 0:00:08  0:00:05  0:00:03 6973k
> 2020-11-03T08:18:17.6544951Z  76 54.3M   76 41.8M0 0  6747k  0  
> 0:00:08  0:00:06  0:00:02 7322k
> 2020-11-03T08:18:18.2448109Z  91 54.3M   91 49.7M0 0  6923k  0  
> 0:00:08  0:00:07  0:00:01 7584k
> 2020-11-03T08:18:18.2450531Z 100 54.3M  100 54.3M0 0  7010k  0  
> 0:00:07  0:00:07 --:--:-- 7737k
> 2020-11-03T08:18:20.2751451Z Nov 03 08:18:20 Zookeeper Server has been 
> started ...
> 2020-11-03T08:18:22.0064118Z Nov 03 08:18:22 Waiting for broker...
> 2020-11-03T08:18:25.4758082Z Nov 03 08:18:25 Created topic 
> test-python-data-stream-source.
> 2020-11-03T08:18:25.8324767Z Nov 03 08:18:25 Sending messages to Kafka...
> 2020-11-03T08:18:35.2954788Z Nov 03 08:18:35 >>Created topic 
> test-python-data-stream-sink.
> 2020-11-03T08:18:54.8314099Z Nov 03 08:18:54 Job has been submitted with 
> JobID 1b0c317b47c69ee600937e1715ad9cce
> 2020-11-03T08:18:54.8348757Z Nov 03 08:18:54 Reading kafka messages...
> 2020-11-03T08:53:10.5246998Z 
> ==
> 2020-11-03T08:53:10.5249381Z === WARNING: This E2E Run took already 80% of 
> the allocated time budget of 250 minutes ===
> 2020-11-03T08:53:10.5251343Z 
> ==
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20164) Add docs for ProcessFunction and Timer in Python DataStream API.

2020-11-15 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-20164:
---
Summary: Add docs for ProcessFunction and Timer in Python DataStream API.  
(was: Add docs for ProcessFunction and Timer for Python DataStream API.)

> Add docs for ProcessFunction and Timer in Python DataStream API.
> 
>
> Key: FLINK-20164
> URL: https://issues.apache.org/jira/browse/FLINK-20164
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Reporter: Shuiqiang Chen
>Assignee: Shuiqiang Chen
>Priority: Minor
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20164) Add docs for ProcessFunction and Timer for Python DataStream API.

2020-11-15 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20164:
--

 Summary: Add docs for ProcessFunction and Timer for Python 
DataStream API.
 Key: FLINK-20164
 URL: https://issues.apache.org/jira/browse/FLINK-20164
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Shuiqiang Chen
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20135) Test Python DataStream API

2020-11-13 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17231578#comment-17231578
 ] 

Shuiqiang Chen commented on FLINK-20135:


[~dian.fu] Thank you for bring up the Task, sure, I will do the tests.

> Test Python DataStream API
> --
>
> Key: FLINK-20135
> URL: https://issues.apache.org/jira/browse/FLINK-20135
> Project: Flink
>  Issue Type: Task
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
> Fix For: 1.12.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20137) User cannot obtain the timestamp of current processing element in the ProcessFunction of Python DataStream API

2020-11-12 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20137:
--

 Summary: User cannot obtain the timestamp of current processing 
element in the ProcessFunction of Python DataStream API
 Key: FLINK-20137
 URL: https://issues.apache.org/jira/browse/FLINK-20137
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.12.0


Currently, User cannot obtain the timestamp of current processing element in 
the ProcessFunction of Python DataStream API. It is because we forget to emit 
the previous timestamp of current record in Upstream operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20016) Support TimestampAssigner and WatermarkGenerator for Python DataStream API.

2020-11-05 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-20016:
--

 Summary: Support TimestampAssigner and WatermarkGenerator for 
Python DataStream API.
 Key: FLINK-20016
 URL: https://issues.apache.org/jira/browse/FLINK-20016
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19821) Add ProcessFunction and timer access for Python DataStream API.

2020-10-27 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-19821:
--

 Summary: Add ProcessFunction and timer access for Python 
DataStream API.
 Key: FLINK-19821
 URL: https://issues.apache.org/jira/browse/FLINK-19821
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.12.0


Add ProcessFunction and timer accessing interfaces for Python DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19770) PythonProgramOptionsTest requires package phase before execution

2020-10-22 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219404#comment-17219404
 ] 

Shuiqiang Chen commented on FLINK-19770:


[~juha.mynttinen] Thank you for reporting this issue, we should make these test 
cases as integration tests that would be executed after the package building 
phase completed as [~chesnay] mentioned. I would like to fix it. 

> PythonProgramOptionsTest requires package phase before execution
> 
>
> Key: FLINK-19770
> URL: https://issues.apache.org/jira/browse/FLINK-19770
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.2
>Reporter: Juha Mynttinen
>Priority: Minor
>  Labels: starter
> Fix For: 1.12.0, 1.11.3
>
>
> The PR [https://github.com/apache/flink/pull/13322] lately added the test 
> method  testConfigurePythonExecution in 
> org.apache.flink.client.cli.PythonProgramOptionsTest.
>  
> "mvn clean verify" fails for me in  testConfigurePythonExecution:
>  
> ...
> INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest
> [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 
> s <<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest
> [ERROR] 
> testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest)
>   Time elapsed: 0.019 s  <<< ERROR!
> java.nio.file.NoSuchFileException: target/dummy-job-jar
> at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
> at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
> at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
> at 
> java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
> at 
> java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
> at 
> java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
> at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
> at 
> java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
> at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
> at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2716)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
> at 
> org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
> at 
> 

[jira] [Commented] (FLINK-19484) Kubernetes pyflink application test failed with "error executing jsonpath "{range .items[*]}{.metadata.name}{\"\\n\"}{end}": Error executing template: not in rang"

2020-10-12 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17212208#comment-17212208
 ] 

Shuiqiang Chen commented on FLINK-19484:


[~dian.fu] Thank you for reporting this issue. It might be the matter of 
minikube resource allocation. The image size for native k8s pyflink application 
is a bit large, which might take some time to start up the pod. Currently, the 
timeout for waiting the job manager's availability are only 30s, increasing it 
to 120s might resolve this problem.

> Kubernetes pyflink application test failed with "error executing jsonpath 
> "{range .items[*]}{.metadata.name}{\"\\n\"}{end}": Error executing template: 
> not in rang"
> ---
>
> Key: FLINK-19484
> URL: https://issues.apache.org/jira/browse/FLINK-19484
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7139=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-09-30T21:14:41.0570715Z 2020-09-30 21:14:41,056 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create 
> flink application cluster flink-native-k8s-pyflink-application-1 
> successfully, JobManager Web Interface: http://10.1.0.4:30141
> 2020-09-30T21:15:11.2323195Z error: timed out waiting for the condition on 
> deployments/flink-native-k8s-pyflink-application-1
> 2020-09-30T21:15:11.2384760Z Stopping job timeout watchdog (with pid=111683)
> 2020-09-30T21:15:11.2386729Z Debugging failed Kubernetes test:
> 2020-09-30T21:15:11.2387191Z Currently existing Kubernetes resources
> 2020-09-30T21:15:11.3502610Z NAME 
>  TYPECLUSTER-IP EXTERNAL-IP   PORT(S) AGE
> 2020-09-30T21:15:11.3506194Z service/flink-native-k8s-pyflink-application-1   
>  ClusterIP   None   6123/TCP,6124/TCP   31s
> 2020-09-30T21:15:11.3507403Z 
> service/flink-native-k8s-pyflink-application-1-rest   NodePort
> 10.104.215.1   8081:30141/TCP  31s
> 2020-09-30T21:15:11.3529743Z service/kubernetes   
>  ClusterIP   10.96.0.1  443/TCP 18m
> 2020-09-30T21:15:11.3530391Z 
> 2020-09-30T21:15:11.3531349Z NAME 
> READY   UP-TO-DATE   AVAILABLE   AGE
> 2020-09-30T21:15:11.3532200Z 
> deployment.apps/flink-native-k8s-pyflink-application-1   0/1 0
> 0   31s
> 2020-09-30T21:15:11.4887105Z Name:  
> flink-native-k8s-pyflink-application-1
> 2020-09-30T21:15:11.4887491Z Namespace: default
> 2020-09-30T21:15:11.4888028Z Labels:
> app=flink-native-k8s-pyflink-application-1
> 2020-09-30T21:15:11.4888534Ztype=flink-native-kubernetes
> 2020-09-30T21:15:11.443Z Annotations:   
> 2020-09-30T21:15:11.4890558Z Selector:  
> app=flink-native-k8s-pyflink-application-1,component=jobmanager,type=flink-native-kubernetes
> 2020-09-30T21:15:11.4900945Z Type:  ClusterIP
> 2020-09-30T21:15:11.4901267Z IP:None
> 2020-09-30T21:15:11.4903601Z Port:  jobmanager-rpc  6123/TCP
> 2020-09-30T21:15:11.4903911Z TargetPort:6123/TCP
> 2020-09-30T21:15:11.4904155Z Endpoints: 
> 2020-09-30T21:15:11.4907175Z Port:  blobserver  6124/TCP
> 2020-09-30T21:15:11.4907586Z TargetPort:6124/TCP
> 2020-09-30T21:15:11.4907842Z Endpoints: 
> 2020-09-30T21:15:11.4908063Z Session Affinity:  None
> 2020-09-30T21:15:11.4908298Z Events:
> 2020-09-30T21:15:11.4970434Z 
> 2020-09-30T21:15:11.4970653Z 
> 2020-09-30T21:15:11.4971488Z Name: 
> flink-native-k8s-pyflink-application-1-rest
> 2020-09-30T21:15:11.4971843Z Namespace:default
> 2020-09-30T21:15:11.4972563Z Labels:   
> app=flink-native-k8s-pyflink-application-1
> 2020-09-30T21:15:11.4973297Z   
> type=flink-native-kubernetes
> 2020-09-30T21:15:11.4973749Z Annotations:  
> 2020-09-30T21:15:11.4974568Z Selector: 
> app=flink-native-k8s-pyflink-application-1,component=jobmanager,type=flink-native-kubernetes
> 2020-09-30T21:15:11.4974969Z Type: NodePort
> 2020-09-30T21:15:11.4976265Z IP:   10.104.215.1
> 2020-09-30T21:15:11.4976620Z Port: rest  8081/TCP
> 2020-09-30T21:15:11.4976991Z TargetPort:   8081/TCP
> 

[jira] [Updated] (FLINK-19145) Add PyFlink-walkthrough in Flink playground.

2020-09-06 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-19145:
---
Description: Add the walkthrough for PyFlink in Flink playground.

> Add PyFlink-walkthrough in Flink playground.
> 
>
> Key: FLINK-19145
> URL: https://issues.apache.org/jira/browse/FLINK-19145
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation / Training / Exercises
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Add the walkthrough for PyFlink in Flink playground.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19145) Add PyFlink-walkthrough to Flink playground.

2020-09-06 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-19145:
---
Summary: Add PyFlink-walkthrough to Flink playground.  (was: Add 
PyFlink-walkthrough in Flink playground.)

> Add PyFlink-walkthrough to Flink playground.
> 
>
> Key: FLINK-19145
> URL: https://issues.apache.org/jira/browse/FLINK-19145
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation / Training / Exercises
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Add the walkthrough for PyFlink in Flink playground.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >