[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Test Params Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h1. Performance Compare with MapMapConverter h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip toInternalOrNull & toExternal, Use the udf result directly. h1. Performance Compare without MapMapConverter h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip toInternalOrNull & toExternal, Use the udf result directly. h1. Performance Compare without MapMapConverter h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of nesting udf invoking > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking. The performance of some converter is poor. > h1. Test Params > Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h1. Performance Compare with MapMapConverter > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h1. Goal > For some cases, skip toInternalOrNull & toExternal, Use the udf result > directly. > h1. Performance Compare without MapMapConverter > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip toInternalOrNull & toExternal, Use the udf result directly. h1. Performance Compare without MapMapConverter h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip toInternalOrNull h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of nesting udf invoking > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking. The performance of some converter is poor. > h1. Performance Compare with MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h1. Goal > For some cases, skip toInternalOrNull & toExternal, Use the udf result > directly. > h1. Performance Compare without MapMapConverter > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip toInternalOrNull h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip the h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of nesting udf invoking > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking. The performance of some converter is poor. > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h1. Goal > For some cases, skip toInternalOrNull > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking. The performance of some converter is poor. h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h1. Goal For some cases, skip the h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of nesting udf invoking > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking. The performance of some converter is poor. > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h1. Goal > For some cases, skip the > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of nesting udf invoking
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Summary: Improve the performance of nesting udf invoking (was: Improve the performance of invoking nesting udf) > Improve the performance of nesting udf invoking > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput of nesting udf invoking with MapMapConverter(5 times): 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput of nesting udf invoking with MapMapConverter(5 times): > 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput of nesting udf invoking without MapMapConverter: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround In some nesting udf invoking cases, Flink convert the udf result to external object and then convert to internalOrNull object as params for next udf invoking h1. Performance Compare with MapMapConverter & without MapMapConverter Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be toInternalOrNull and toExternal. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! This issue will improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > In some nesting udf invoking cases, Flink convert the udf result to external > object and then convert to internalOrNull object as params for next udf > invoking > h1. Performance Compare with MapMapConverter & without MapMapConverter > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691 ] lynn1.zhang edited comment on FLINK-22994 at 6/17/21, 4:42 AM: --- {code:java} public static void main(String[] args) { final int totalTestCount = 100 * 1; System.out.println(BinaryStringData.fromString("")); System.out.println( "Total test count:" + totalTestCount + ", The spend with MapMapConverter " + testWithMapMapConverter(totalTestCount) + " ms."); System.out.println( "Total test count:" + totalTestCount + ", The spend without Converter " + testWithOutMapMapConverter(totalTestCount) + " ms."); }{code} Amazing code, Thank you for your advice. I think create another GeneratedExpression that extend GeneratedExpression is a good idea. was (Author: zicat): public static void main(String[] args) {final int totalTestCount = 100 * 1;System.out.println(BinaryStringData.fromString("")); System.out.println("Total test count:"+ totalTestCount + ", The spend with MapMapConverter "+ testWithMapMapConverter(totalTestCount) + " ms.");System.out.println("Total test count:"+ totalTestCount + ", The spend without Converter "+ testWithOutMapMapConverter(totalTestCount) + " ms."); } Amazing code, Thank you for your advice. I think create another GeneratedExpression that extend GeneratedExpression is a good idea. > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be toInternalOrNull and toExternal. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691 ] lynn1.zhang commented on FLINK-22994: - public static void main(String[] args) {final int totalTestCount = 100 * 1;System.out.println(BinaryStringData.fromString("")); System.out.println("Total test count:"+ totalTestCount + ", The spend with MapMapConverter "+ testWithMapMapConverter(totalTestCount) + " ms.");System.out.println("Total test count:"+ totalTestCount + ", The spend without Converter "+ testWithOutMapMapConverter(totalTestCount) + " ms."); } Amazing code, Thank you for your advice. I think create another GeneratedExpression that extend GeneratedExpression is a good idea. > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Assignee: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be toInternalOrNull and toExternal. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be toInternalOrNull and toExternal. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! This issue will improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! This issue will improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be toInternalOrNull and toExternal. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364666#comment-17364666 ] lynn1.zhang commented on FLINK-22994: - [^StringConverterTest.java] I also test the StringStringConverter, The performance is not good. Total test count:100, The spend with MapMapConverter 7243 ms. Total test count:100, The spend without Converter 72 ms. > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Attachment: StringConverterTest.java > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: StringConverterTest.java, Test.java, > image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, > image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, > image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, > image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, > test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364658#comment-17364658 ] lynn1.zhang commented on FLINK-22994: - Hi [~lzljs3620320], ipip return the type of map, so the converter is the MapMapConverter not StringStringConverter. [^Test.java] this is my benchmark case which can show the cost difference between with converter and without converter. Total test count:100, The spend with MapMapConverter 12160 ms. Total test count:100, The spend without Converter 52 ms. > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: Test.java, image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, > new_projection_code, old_projection_code, test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Attachment: Test.java > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: Test.java, image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, > new_projection_code, old_projection_code, test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364195#comment-17364195 ] lynn1.zhang commented on FLINK-22994: - [~lzljs3620320] I upload the [^test.sql] with the [^old_projection_code] and [^new_projection_code] . Before this PR, udf ipip result(map type) will be converter$4.toInternalOrNull and converter$4.toExternal 4 times. In [^new_projection_code], converter$4.toInternalOrNull and converter$4.toExternal never invoke. > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, > new_projection_code, old_projection_code, test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Attachment: old_projection_code new_projection_code > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, > new_projection_code, old_projection_code, test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Attachment: test.sql > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, > new_projection_code, old_projection_code, test.sql > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! This issue will improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! This issue will improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 k/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 k/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after this issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after this issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; > taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after the issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as BinaryData, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as BinaryData, like BinaryStringData. When > invoking nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = > 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after the issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-42-08-065.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-28-28-137.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as binary, like BinaryStringData. When invoking > nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = > 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-42-08-065.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after the issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Attachment: image-2021-06-15-15-42-08-065.png > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png > > > h1. BackGround > Flink maintain the udf result as binary, like BinaryStringData. When invoking > nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = > 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-28-28-137.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after the issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf
[ https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-22994: Description: h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip, output: map ip_info, map size = 14. * ip_2_country: input map ip_info, output: string country. * ip_2_region: input map ip_info, output: string region. * ip_2_isp_domain: input map ip_info, output: string isp. * ip_2_timezone: input map ip_info, output: string timezone. h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-28-28-137.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! was: h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip,output: map ip_info,map size = 14。 * ip_2_country: input map ip_info,output: string country。 * ip_2_region: input map ip_info,output: string region。 * ip_2_isp_domain: input map ip_info,output: string isp。 * ip_2_timezone: input map ip_info,output: string timezone。 h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-28-28-137.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! > Improve the performance of invoking nesting udf > --- > > Key: FLINK-22994 > URL: https://issues.apache.org/jira/browse/FLINK-22994 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.12.4 > Environment: h5. >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-06-15-15-18-12-619.png, > image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, > image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, > image-2021-06-15-15-30-14-775.png > > > h1. BackGround > Flink maintain the udf result as binary, like BinaryStringData. When invoking > nesting udf like select useless(int_ip_2_string(ip)), the result of > int_ip_2_string(ip) will be serialization and deserialization. > Below is the Generated Code > !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below > !image-2021-06-15-15-19-01-103.png! > h1. Performance Compare > Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = > 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 > CPU @ 2.30GHz > UDF Introduction: > * ipip: input: int ip, output: map ip_info, map size = 14. > * ip_2_country: input map ip_info, output: string country. > * ip_2_region: input map ip_info, output: string region. > * ip_2_isp_domain: input map ip_info, output: string isp. > * ip_2_timezone: input map ip_info, output: string timezone. > h5. The throughput without udf invoke: 764.50 w/s > !image-2021-06-15-15-27-26-739.png! > h5. The throughput with udf invoke: 183.24 k/s > !image-2021-06-15-15-28-28-137.png! > h5. The throughput with udf nesting invoke: 41.42 k/s > !image-2021-06-15-15-29-09-773.png! > h5. The throughput with udf nesting invoke after the issue: 174.41 k/s > !image-2021-06-15-15-30-14-775.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22994) Improve the performance of invoking nesting udf
lynn1.zhang created FLINK-22994: --- Summary: Improve the performance of invoking nesting udf Key: FLINK-22994 URL: https://issues.apache.org/jira/browse/FLINK-22994 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.12.4 Environment: h5. Reporter: lynn1.zhang Attachments: image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip,output: map ip_info,map size = 14。 * ip_2_country: input map ip_info,output: string country。 * ip_2_region: input map ip_info,output: string region。 * ip_2_isp_domain: input map ip_info,output: string isp。 * ip_2_timezone: input map ip_info,output: string timezone。 h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-28-28-137.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341677#comment-17341677 ] lynn1.zhang commented on FLINK-21345: - Hi [~Leonard Xu], I have updated this PR. > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.14.0 > > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min if not used, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer state data will be stored forever. After running a day(about 20 million session_id in our company cases), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer state data will be stored forever. After running a day(about 20 million session_id in our company cases), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! > TemporalRowTimeJoinOperator.java will lead to the state expansion by > short-life-cycle & huge RowData, although config idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > In our company cases, short-life-cycle & huge RowData will use this operator > to join each other. Every key called session_id will be expired after 2min. > With idle.state.retention.time configuration, after 2min if not used, the > leftState and rightState will be cleaned up by the operator but nextLeftIndex > & registeredTimer state data will be stored forever. > After running a day(about 20 million session_id in our company cases), the > checkpoint operator will cause the job crash. > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer state data will be stored forever. After running a day(about 20 million session_id in our company cases), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer state data will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! > TemporalRowTimeJoinOperator.java will lead to the state expansion by > short-life-cycle & huge RowData, although config idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > In our company cases, short-life-cycle & huge RowData will use this operator > to join each other. Every key called session_id will be expired after 2min. > With idle.state.retention.time configuration, after 2min, the leftState and > rightState will be cleaned up by the operator but nextLeftIndex & > registeredTimer state data will be stored forever. > After running a day(about 20 million session_id in our company cases), the > checkpoint operator will cause the job crash. > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer state data will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! > TemporalRowTimeJoinOperator.java will lead to the state expansion by > short-life-cycle & huge RowData, although config idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > In our company cases, short-life-cycle & huge RowData will use this operator > to join each other. Every key called session_id will be expired after 2min. > With idle.state.retention.time configuration, after 2min, the leftState and > rightState will be cleaned up by the operator but nextLeftIndex & > registeredTimer state data will be stored forever. > After running a day(about 20 million session_id in our cases ), the > checkpoint operator will cause the job crash. > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min passed, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! > TemporalRowTimeJoinOperator.java will lead to the state expansion by > short-life-cycle & huge RowData, although config idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > In our company cases, short-life-cycle & huge RowData will use this operator > to join each other. Every key called session_id will be expired after 2min. > With idle.state.retention.time configuration, after 2min, the leftState and > rightState will be cleaned up by the operator but nextLeftIndex & > registeredTimer will be stored forever. > After running a day(about 20 million session_id in our cases ), the > checkpoint operator will cause the job crash. > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: In our company cases, short-life-cycle & huge RowData will use this operator to join each other. Every key called session_id will be expired after 2min. With idle.state.retention.time configuration, after 2min passed, the leftState and rightState will be cleaned up by the operator but nextLeftIndex & registeredTimer will be stored forever. After running a day(about 20 million session_id in our cases ), the checkpoint operator will cause the job crash. I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, although configure idle.state.retention.time I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! Summary: TemporalRowTimeJoinOperator.java will lead to the state expansion by short-life-cycle & huge RowData, although config idle.state.retention.time (was: TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time) > TemporalRowTimeJoinOperator.java will lead to the state expansion by > short-life-cycle & huge RowData, although config idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > In our company cases, short-life-cycle & huge RowData will use this operator > to join each other. Every key called session_id will be expired after 2min. > With idle.state.retention.time configuration, after 2min passed, the > leftState and rightState will be cleaned up by the operator but nextLeftIndex > & registeredTimer will be stored forever. > After running a day(about 20 million session_id in our cases ), the > checkpoint operator will cause the job crash. > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303838#comment-17303838 ] lynn1.zhang commented on FLINK-21833: - Please take a look at this. We depend on this in our production environment. Thank you very much. [~Leonard Xu] > TemporalRowTimeJoinOperator State Leak Although configure > idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, > although configure idle.state.retention.time > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time
[ https://issues.apache.org/jira/browse/FLINK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21833: Description: TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, although configure idle.state.retention.time I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! was: Use TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, although configure idle.state.retention.time I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! > TemporalRowTimeJoinOperator State Leak Although configure > idle.state.retention.time > --- > > Key: FLINK-21833 > URL: https://issues.apache.org/jira/browse/FLINK-21833 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-03-17-11-06-21-768.png > > > TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, > although configure idle.state.retention.time > I have found the bug, and fixed it. > !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time
lynn1.zhang created FLINK-21833: --- Summary: TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time Key: FLINK-21833 URL: https://issues.apache.org/jira/browse/FLINK-21833 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.12.2 Reporter: lynn1.zhang Attachments: image-2021-03-17-11-06-21-768.png Use TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, although configure idle.state.retention.time I have found the bug, and fixed it. !image-2021-03-17-11-06-21-768.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291 ] lynn1.zhang edited comment on FLINK-21345 at 2/17/21, 10:36 AM: [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I try to add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule.java, It seems run ok. was (Author: zicat): [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue? > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at >
[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291 ] lynn1.zhang edited comment on FLINK-21345 at 2/10/21, 8:51 AM: --- [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue? was (Author: zicat): [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue. > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Major > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at >
[jira] [Comment Edited] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291 ] lynn1.zhang edited comment on FLINK-21345 at 2/10/21, 8:51 AM: --- [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue. was (Author: zicat): [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Major > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at >
[jira] [Commented] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282291#comment-17282291 ] lynn1.zhang commented on FLINK-21345: - [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Major > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at >
[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21345: Attachment: image-2021-02-10-16-00-45-553.png > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > -- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os >Reporter: lynn1.zhang >Priority: Major > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.immutable.Range.foreach(Range.scala:166) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) >
[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21345: Description: First Step: Create 2 Source Tables as below: {code:java} CREATE TABLE test_streaming( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE test_streaming2( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming2', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); {code} Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime() Third Step: test_streaming union all test_streaming2 join dim like below: {code:java} SELECT r.vid,d.name,timestamp_from_long(r.ts) FROM ( SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 ) AS r, LATERAL TABLE (dim(r.proc)) AS d WHERE r.vid = d.vid; {code} Exception Detail: (if only use test-streaming or test-streaming2 join temporary table function, the program run ok) {code:java} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.immutable.Range.foreach(Range.scala:166) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at
[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21345: Description: First Step: Create 2 Source Tables as below: {code:java} CREATE TABLE test_streaming( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE test_streaming2( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming2', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); {code} Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime() Third Step: test_streaming union all test_streaming2 join dim like below: {code:java} SELECT r.vid,d.name,timestamp_from_long(r.ts) FROM ( SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 ) AS r, LATERAL TABLE (dim(r.proc)) AS d WHERE r.vid = d.vid; {code} Exception Detail: (if only use test-streaming or test-streaming2 join temporary table function, the program run ok) {code:java} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.immutable.Range.foreach(Range.scala:166) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at
[jira] [Updated] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-21345: Description: First Step: Create 2 Source Tables as below: {code:java} CREATE TABLE test_streaming( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE test_streaming2( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming2', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); {code} Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime() Third Step: test_streaming union all test_streaming2 join dim like below: {code:java} SELECT r.vid,d.name,timestamp_from_long(r.ts) FROM ( SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 ) AS r, LATERAL TABLE (dim(r.proc)) AS d WHERE r.vid = d.vid; {code} Exception Detail: (if not union all, the program run ok) {code:java} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.immutable.Range.foreach(Range.scala:166) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at
[jira] [Created] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
lynn1.zhang created FLINK-21345: --- Summary: NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 Key: FLINK-21345 URL: https://issues.apache.org/jira/browse/FLINK-21345 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.1 Environment: Planner: BlinkPlanner Flink Version: 1.12.1_2.11 Java Version: 1.8 OS: mac os Reporter: lynn1.zhang First Step: Create 2 Source Tables as below: {code:java} CREATE TABLE test_streaming( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE test_streaming2( vid BIGINT, ts BIGINT, proc AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-streaming2', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); {code} Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime() Third Step, test_streaming union all test_streaming2 join dim like below: {code:java} SELECT r.vid,d.name,timestamp_from_long(r.ts) FROM ( SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 ) AS r, LATERAL TABLE (dim(r.proc)) AS d WHERE r.vid = d.vid; {code} Exception Detail:(if not union all, the program run ok) {code:java} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) at scala.collection.immutable.Range.foreach(Range.scala:166) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at
[jira] [Comment Edited] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881665#comment-16881665 ] lynn1.zhang edited comment on FLINK-13072 at 7/10/19 6:33 AM: -- [~srichter] Done was (Author: zicat): [~srichter] Please help to close, thx. > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang closed FLINK-13072. --- Resolution: Not A Bug > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881665#comment-16881665 ] lynn1.zhang commented on FLINK-13072: - [~srichter] Please help to close, thx. > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lynn1.zhang updated FLINK-13072: Attachment: image-2019-07-03-17-04-17-253.png > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877657#comment-16877657 ] lynn1.zhang commented on FLINK-13072: - I think we can modify the document [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/StateBackend.html] [~klion26] !image-2019-07-03-17-04-17-253.png! > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip, image-2019-07-03-17-04-17-253.png > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877608#comment-16877608 ] lynn1.zhang edited comment on FLINK-13072 at 7/3/19 9:02 AM: - [~yunta] from beginning, I also think the {{input.iterator()}} is not thread safe, but I try to print each element, all element print successfully. And I replace from RocksDBStateBachend to FsStateBackend, everything is right. I use lock object to lock the state.get() and state.put(), the result is also right. was (Author: zicat): from beginning, I also think the {{input.iterator()}} is not thread safe, but I try to print each element, all element print successfully. And I replace from RocksDBStateBachend to FsStateBackend, everything is right. > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
[ https://issues.apache.org/jira/browse/FLINK-13072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877608#comment-16877608 ] lynn1.zhang commented on FLINK-13072: - from beginning, I also think the {{input.iterator()}} is not thread safe, but I try to print each element, all element print successfully. And I replace from RocksDBStateBachend to FsStateBackend, everything is right. > RocksDBStateBachend is not thread safe and data loss silently > - > > Key: FLINK-13072 > URL: https://issues.apache.org/jira/browse/FLINK-13072 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 >Reporter: lynn1.zhang >Priority: Blocker > Attachments: flink-demo.zip > > > I create 2 mapstates in one operator, then create 2 threads in apply method, > each thread operate each map state(the operator is same), the expect result > is that 2 state have the same result but not. I upload the code, please help > to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
lynn1.zhang created FLINK-13072: --- Summary: RocksDBStateBachend is not thread safe and data loss silently Key: FLINK-13072 URL: https://issues.apache.org/jira/browse/FLINK-13072 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.8.1, 1.8.0 Reporter: lynn1.zhang Attachments: flink-demo.zip I create 2 mapstates in one operator, then create 2 threads in apply method, each thread operate each map state(the operator is same), the expect result is that 2 state have the same result but not. I upload the code, please help to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)