[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking

2021-07-06 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Test Params

Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h1. Performance Compare with MapMapConverter
h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip toInternalOrNull & toExternal, Use the udf result directly.
h1. Performance Compare without MapMapConverter
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip toInternalOrNull & toExternal, Use the udf result directly.
h1. Performance Compare without MapMapConverter
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of nesting udf invoking
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking. The performance of some converter is poor.   
> h1. Test Params
> Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h1. Performance Compare with MapMapConverter
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h1. Goal
> For some cases, skip toInternalOrNull & toExternal, Use the udf result 
> directly.
> h1. Performance Compare without MapMapConverter
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking

2021-07-06 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip toInternalOrNull & toExternal, Use the udf result directly.
h1. Performance Compare without MapMapConverter
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip toInternalOrNull
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of nesting udf invoking
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking. The performance of some converter is poor.   
> h1. Performance Compare with MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h1. Goal
> For some cases, skip toInternalOrNull & toExternal, Use the udf result 
> directly.
> h1. Performance Compare without MapMapConverter
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking

2021-07-06 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip toInternalOrNull
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip the 
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of nesting udf invoking
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking. The performance of some converter is poor.   
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h1. Goal
> For some cases, skip toInternalOrNull
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking

2021-07-06 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking. The performance of some converter is poor.   
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h1. Goal

For some cases, skip the 
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of nesting udf invoking
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking. The performance of some converter is poor.   
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h1. Goal
> For some cases, skip the 
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking

2021-06-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Summary: Improve the performance of nesting udf invoking  (was: Improve the 
performance of invoking nesting udf)

> Improve the performance of nesting udf invoking
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking  
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 
k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking  
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 
> 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking  
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be toInternalOrNull and toExternal.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking  
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691
 ] 

lynn1.zhang edited comment on FLINK-22994 at 6/17/21, 4:42 AM:
---

 
{code:java}
public static void main(String[] args) {
final int totalTestCount = 100 * 1;
System.out.println(BinaryStringData.fromString(""));
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend with MapMapConverter "
+ testWithMapMapConverter(totalTestCount)
+ " ms.");
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend without Converter "
+ testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}{code}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 


was (Author: zicat):
public static void main(String[] args) {final int totalTestCount = 100 * 
1;System.out.println(BinaryStringData.fromString(""));
System.out.println("Total test count:"+ 
totalTestCount
+ ", The spend with MapMapConverter "+ 
testWithMapMapConverter(totalTestCount)
+ " ms.");System.out.println("Total test 
count:"+ totalTestCount
+ ", The spend without Converter "+ 
testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691
 ] 

lynn1.zhang commented on FLINK-22994:
-

public static void main(String[] args) {final int totalTestCount = 100 * 
1;System.out.println(BinaryStringData.fromString(""));
System.out.println("Total test count:"+ 
totalTestCount
+ ", The spend with MapMapConverter "+ 
testWithMapMapConverter(totalTestCount)
+ " ms.");System.out.println("Total test 
count:"+ totalTestCount
+ ", The spend without Converter "+ 
testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be toInternalOrNull and toExternal.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364666#comment-17364666
 ] 

lynn1.zhang commented on FLINK-22994:
-

[^StringConverterTest.java] I also test the StringStringConverter, The 
performance is not good.

Total test count:100, The spend with MapMapConverter 7243 ms.
Total test count:100, The spend without Converter 72 ms.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: StringConverterTest.java

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364658#comment-17364658
 ] 

lynn1.zhang commented on FLINK-22994:
-

Hi [~lzljs3620320], ipip return the type of map, so the converter is the 
MapMapConverter not StringStringConverter.

[^Test.java] this is my benchmark case which can show the cost difference 
between with converter and without converter.


Total test count:100, The spend with MapMapConverter 12160 ms.
Total test count:100, The spend without Converter 52 ms. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: Test.java, image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: Test.java

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: Test.java, image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364195#comment-17364195
 ] 

lynn1.zhang commented on FLINK-22994:
-

[~lzljs3620320] I upload the  [^test.sql] with the [^old_projection_code] and 
[^new_projection_code] .

 Before this PR,  udf ipip result(map type) will be  
converter$4.toInternalOrNull and converter$4.toExternal 4 times. 

In [^new_projection_code],  converter$4.toInternalOrNull and 
converter$4.toExternal never invoke.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: old_projection_code
new_projection_code

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: test.sql

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after the issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
> 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after the issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-28-28-137.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as binary, like BinaryStringData. When invoking 
> nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
> 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after the issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: image-2021-06-15-15-42-08-065.png

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png
>
>
> h1. BackGround
> Flink maintain the udf result as binary, like BinaryStringData. When invoking 
> nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
> 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-28-28-137.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after the issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-28-28-137.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
UDF Introduction:
 * ipip:  input: int ip,output: map ip_info,map size = 14。
 * ip_2_country: input map ip_info,output: string country。
 * ip_2_region: input  map ip_info,output: string region。
 * ip_2_isp_domain: input  map ip_info,output: string isp。
 * ip_2_timezone: input map ip_info,output: string timezone。

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s
!image-2021-06-15-15-28-28-137.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png
>
>
> h1. BackGround
> Flink maintain the udf result as binary, like BinaryStringData. When invoking 
> nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
> 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 w/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-28-28-137.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after the issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-22994:
---

 Summary: Improve the performance of invoking nesting udf
 Key: FLINK-22994
 URL: https://issues.apache.org/jira/browse/FLINK-22994
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.4
 Environment: h5.  
Reporter: lynn1.zhang
 Attachments: image-2021-06-15-15-18-12-619.png, 
image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
image-2021-06-15-15-30-14-775.png

h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
UDF Introduction:
 * ipip:  input: int ip,output: map ip_info,map size = 14。
 * ip_2_country: input map ip_info,output: string country。
 * ip_2_region: input  map ip_info,output: string region。
 * ip_2_isp_domain: input  map ip_info,output: string isp。
 * ip_2_timezone: input map ip_info,output: string timezone。

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s
!image-2021-06-15-15-28-28-137.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-05-10 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341677#comment-17341677
 ] 

lynn1.zhang commented on FLINK-21345:
-

Hi [~Leonard Xu], I have updated this PR.

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 

[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time

2021-03-22 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min if not used, the 
leftState and rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer state data will be stored forever.

After running a day(about 20 million session_id in our company cases), the 
checkpoint operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer state data will be stored forever.

After running a day(about 20 million session_id in our company cases), the 
checkpoint operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!


> TemporalRowTimeJoinOperator.java will lead to the state expansion by 
> short-life-cycle & huge RowData, although config idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> In our company cases, short-life-cycle & huge RowData will use this operator 
> to join each other. Every key called session_id will be expired after 2min.
> With idle.state.retention.time configuration, after 2min if not used, the 
> leftState and rightState will be cleaned up by the operator but nextLeftIndex 
> & registeredTimer state data will be stored forever.
> After running a day(about 20 million session_id in our company cases), the 
> checkpoint operator will cause the job crash.
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time

2021-03-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer state data will be stored forever.

After running a day(about 20 million session_id in our company cases), the 
checkpoint operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer state data will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!


> TemporalRowTimeJoinOperator.java will lead to the state expansion by 
> short-life-cycle & huge RowData, although config idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> In our company cases, short-life-cycle & huge RowData will use this operator 
> to join each other. Every key called session_id will be expired after 2min.
> With idle.state.retention.time configuration, after 2min, the leftState and 
> rightState will be cleaned up by the operator but nextLeftIndex & 
> registeredTimer state data will be stored forever.
> After running a day(about 20 million session_id in our company cases), the 
> checkpoint operator will cause the job crash.
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time

2021-03-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer state data will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!


> TemporalRowTimeJoinOperator.java will lead to the state expansion by 
> short-life-cycle & huge RowData, although config idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> In our company cases, short-life-cycle & huge RowData will use this operator 
> to join each other. Every key called session_id will be expired after 2min.
> With idle.state.retention.time configuration, after 2min, the leftState and 
> rightState will be cleaned up by the operator but nextLeftIndex & 
> registeredTimer state data will be stored forever.
> After running a day(about 20 million session_id in our cases ), the 
> checkpoint operator will cause the job crash.
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time

2021-03-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min, the leftState and 
rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min passed, the leftState 
and rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!


> TemporalRowTimeJoinOperator.java will lead to the state expansion by 
> short-life-cycle & huge RowData, although config idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> In our company cases, short-life-cycle & huge RowData will use this operator 
> to join each other. Every key called session_id will be expired after 2min.
> With idle.state.retention.time configuration, after 2min, the leftState and 
> rightState will be cleaned up by the operator but nextLeftIndex & 
> registeredTimer will be stored forever.
> After running a day(about 20 million session_id in our cases ), the 
> checkpoint operator will cause the job crash.
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time

2021-03-21 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
In our company cases, short-life-cycle & huge RowData will use this operator to 
join each other. Every key called session_id will be expired after 2min.

With idle.state.retention.time configuration, after 2min passed, the leftState 
and rightState will be cleaned up by the operator but nextLeftIndex & 
registeredTimer will be stored forever.

After running a day(about 20 million session_id in our cases ), the checkpoint 
operator will cause the job crash.

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
although configure idle.state.retention.time

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

Summary: TemporalRowTimeJoinOperator.java will lead to the state 
expansion by short-life-cycle & huge RowData, although config 
idle.state.retention.time  (was: TemporalRowTimeJoinOperator State Leak 
Although configure idle.state.retention.time)

> TemporalRowTimeJoinOperator.java will lead to the state expansion by 
> short-life-cycle & huge RowData, although config idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> In our company cases, short-life-cycle & huge RowData will use this operator 
> to join each other. Every key called session_id will be expired after 2min.
> With idle.state.retention.time configuration, after 2min passed, the 
> leftState and rightState will be cleaned up by the operator but nextLeftIndex 
> & registeredTimer will be stored forever.
> After running a day(about 20 million session_id in our cases ), the 
> checkpoint operator will cause the job crash.
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time

2021-03-17 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303838#comment-17303838
 ] 

lynn1.zhang commented on FLINK-21833:
-

Please take a look at this. We depend on this in our production environment. 
Thank you very much. [~Leonard Xu]

> TemporalRowTimeJoinOperator State Leak Although configure 
> idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
> although configure idle.state.retention.time
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time

2021-03-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21833:

Description: 
TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
although configure idle.state.retention.time

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!

  was:
Use TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
although configure idle.state.retention.time

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!


> TemporalRowTimeJoinOperator State Leak Although configure 
> idle.state.retention.time
> ---
>
> Key: FLINK-21833
> URL: https://issues.apache.org/jira/browse/FLINK-21833
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-03-17-11-06-21-768.png
>
>
> TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
> although configure idle.state.retention.time
> I have found the bug, and fixed it.
> !image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time

2021-03-16 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-21833:
---

 Summary: TemporalRowTimeJoinOperator State Leak Although configure 
idle.state.retention.time
 Key: FLINK-21833
 URL: https://issues.apache.org/jira/browse/FLINK-21833
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.12.2
Reporter: lynn1.zhang
 Attachments: image-2021-03-17-11-06-21-768.png

Use TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
although configure idle.state.retention.time

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-17 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291
 ] 

lynn1.zhang edited comment on FLINK-21345 at 2/17/21, 10:36 AM:


[~jark] [~Leonard Xu] 
 !image-2021-02-10-16-00-45-553.png!

I try to add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule.java, It seems run ok.


was (Author: zicat):
[~jark] [~Leonard Xu] 
 !image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

Can I create a merge request to fix the issue?

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> 

[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-10 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291
 ] 

lynn1.zhang edited comment on FLINK-21345 at 2/10/21, 8:51 AM:
---

[~jark] [~Leonard Xu] 
 !image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

Can I create a merge request to fix the issue?


was (Author: zicat):
[~jark] [~Leonard Xu] 
 !image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

Can I create a merge request to fix the issue. 

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Major
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> 

[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-10 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291
 ] 

lynn1.zhang edited comment on FLINK-21345 at 2/10/21, 8:51 AM:
---

[~jark] [~Leonard Xu] 
 !image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

Can I create a merge request to fix the issue. 


was (Author: zicat):
[~jark] [~Leonard Xu] 
!image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

 

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Major
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> 

[jira] [Commented] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-10 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291
 ] 

lynn1.zhang commented on FLINK-21345:
-

[~jark] [~Leonard Xu] 
!image-2021-02-10-16-00-45-553.png!

I add the code of the method getRelOptSchema in 
LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok.

 

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Major
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> 

[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-10 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21345:

Attachment: image-2021-02-10-16-00-45-553.png

> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> --
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>Reporter: lynn1.zhang
>Priority: Major
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>   at scala.collection.immutable.Range.foreach(Range.scala:166)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>   

[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-09 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21345:

Description: 
First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step: test_streaming union all  test_streaming2 join dim like below:
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail: (if only use test-streaming or test-streaming2 join temporary 
table function, the program run ok)
{code:java}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at 

[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-09 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21345:

Description: 
First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step: test_streaming union all  test_streaming2 join dim like below:
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail: (if only use test-streaming or test-streaming2 join temporary 
table function, the program run ok)
{code:java}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at 

[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-09 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-21345:

Description: 
First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step: test_streaming union all  test_streaming2 join dim like below:
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail: (if not union all, the program run ok)
{code:java}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at 

[jira] [Created] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-09 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-21345:
---

 Summary: NullPointerException 
LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
 Key: FLINK-21345
 URL: https://issues.apache.org/jira/browse/FLINK-21345
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.1
 Environment: Planner: BlinkPlanner
Flink Version: 1.12.1_2.11
Java Version: 1.8
OS: mac os
Reporter: lynn1.zhang


First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step, test_streaming union all  test_streaming2 join dim like below:

 
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail:(if not union all, the program run ok)

 
{code:java}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 

[jira] [Comment Edited] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-10 Thread lynn1.zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881665#comment-16881665
 ] 

lynn1.zhang edited comment on FLINK-13072 at 7/10/19 6:33 AM:
--

[~srichter] Done


was (Author: zicat):
[~srichter] Please help to close, thx.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-10 Thread lynn1.zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang closed FLINK-13072.
---
Resolution: Not A Bug

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-09 Thread lynn1.zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881665#comment-16881665
 ] 

lynn1.zhang commented on FLINK-13072:
-

[~srichter] Please help to close, thx.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-03 Thread lynn1.zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-13072:

Attachment: image-2019-07-03-17-04-17-253.png

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-03 Thread lynn1.zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877657#comment-16877657
 ] 

lynn1.zhang commented on FLINK-13072:
-

I think we can modify the document 
[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/StateBackend.html]

[~klion26]

!image-2019-07-03-17-04-17-253.png!

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-03 Thread lynn1.zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877608#comment-16877608
 ] 

lynn1.zhang edited comment on FLINK-13072 at 7/3/19 9:02 AM:
-

[~yunta] from beginning, I also think the {{input.iterator()}} is not thread 
safe, but I try to print each element, all element print successfully. And I 
replace from  RocksDBStateBachend to  FsStateBackend, everything is right. I 
use lock object to lock the state.get() and state.put(), the result is also 
right.


was (Author: zicat):
from beginning, I also think the {{input.iterator()}} is not thread safe, but I 
try to print each element, all element print successfully. And I replace from  
RocksDBStateBachend to  FsStateBackend, everything is right.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-03 Thread lynn1.zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877608#comment-16877608
 ] 

lynn1.zhang commented on FLINK-13072:
-

from beginning, I also think the {{input.iterator()}} is not thread safe, but I 
try to print each element, all element print successfully. And I replace from  
RocksDBStateBachend to  FsStateBackend, everything is right.

> RocksDBStateBachend is not thread safe and data loss silently
> -
>
> Key: FLINK-13072
> URL: https://issues.apache.org/jira/browse/FLINK-13072
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
>Reporter: lynn1.zhang
>Priority: Blocker
> Attachments: flink-demo.zip
>
>
> I create 2 mapstates in one operator, then create 2 threads in apply method, 
> each thread operate each map state(the operator is same), the expect result 
> is that 2 state have the same result but not. I upload the code, please help 
> to try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-02 Thread lynn1.zhang (JIRA)
lynn1.zhang created FLINK-13072:
---

 Summary: RocksDBStateBachend is not thread safe and data loss 
silently
 Key: FLINK-13072
 URL: https://issues.apache.org/jira/browse/FLINK-13072
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.8.1, 1.8.0
Reporter: lynn1.zhang
 Attachments: flink-demo.zip

I create 2 mapstates in one operator, then create 2 threads in apply method, 
each thread operate each map state(the operator is same), the expect result is 
that 2 state have the same result but not. I upload the code, please help to 
try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)