[jira] [Commented] (FLINK-15538) Separate decimal implementations into separate sub-classes
[ https://issues.apache.org/jira/browse/FLINK-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012394#comment-17012394 ] Liya Fan commented on FLINK-15538: -- [~ykt836] Good point. Admittedly, a virtual function or a branch statement, it is hard to say which is more expensive in general. It depends on some environmental factors and JIT behaviors. Some results [1] suggest that the performance of bimorphic virtual function (a virtual function with two possible implementations) calls is not too much worse than a final function call, and much better than the performance of a general (megamorphic) virtual function call. Another benefit is that the subclasses do not contain irrelavent data fields (e.g. LooseDecimal does not hold the long value). [1] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > Separate decimal implementations into separate sub-classes > -- > > Key: FLINK-15538 > URL: https://issues.apache.org/jira/browse/FLINK-15538 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The current implementation of Decimal values have two (somewhat independent) > implementations: one is based on Long, while the other is based on > BigDecimal. > This makes the Decmial class not clear (both implementations cluttered in a > single class) and less efficient (each method involves a if-else branch). > So in this issue, we make Decimal an abstract class, and separate the two > implementation into two sub-classes. This makes the code clearer and more > efficient. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15538) Separate decimal implementations into separate sub-classes
Liya Fan created FLINK-15538: Summary: Separate decimal implementations into separate sub-classes Key: FLINK-15538 URL: https://issues.apache.org/jira/browse/FLINK-15538 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Liya Fan The current implementation of Decimal values have two (somewhat independent) implementations: one is based on Long, while the other is based on BigDecimal. This makes the Decmial class not clear (both implementations cluttered in a single class) and less efficient (each method involves a if-else branch). So in this issue, we make Decimal an abstract class, and separate the two implementation into two sub-classes. This makes the code clearer and more efficient. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14731) LogicalWatermarkAssigner should use specified trait set when doing copy
Liya Fan created FLINK-14731: Summary: LogicalWatermarkAssigner should use specified trait set when doing copy Key: FLINK-14731 URL: https://issues.apache.org/jira/browse/FLINK-14731 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Liya Fan In LogicalWatermarkAssigner#copy method, creating the new LogicalWatermarkAssigner object should use the trait set from the input parameter, instead of the trait set of the current object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13221) Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
[ https://issues.apache.org/jira/browse/FLINK-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889884#comment-16889884 ] Liya Fan commented on FLINK-13221: -- [~ykt836]. Sure. Thanks a lot. > Blink planner should set ScheduleMode to > LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs > - > > Key: FLINK-13221 > URL: https://issues.apache.org/jira/browse/FLINK-13221 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Kurt Young >Assignee: Liya Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-13221) Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
[ https://issues.apache.org/jira/browse/FLINK-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-13221: Assignee: Liya Fan > Blink planner should set ScheduleMode to > LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs > - > > Key: FLINK-13221 > URL: https://issues.apache.org/jira/browse/FLINK-13221 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Kurt Young >Assignee: Liya Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13200) Improve the generated code for if statements
Liya Fan created FLINK-13200: Summary: Improve the generated code for if statements Key: FLINK-13200 URL: https://issues.apache.org/jira/browse/FLINK-13200 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Liya Fan Assignee: Liya Fan In the generated code, we often code snippet like this: if (true) { acc$6.setNullAt(1); } else { acc$6.setField(1, ((int) -1));; } Such code impacts the code readability, and increases the code size, making it more costly for compiling and transferring through network. In this issue, we remove such useless if conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code
[ https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13108: - Description: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: {{@Override}} {{public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {}} {{org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}} {{...}} This issue removes the duplicated type casts. was: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: {{@Override}} {{public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {}} {{org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}} {{...}} This issue remove the duplicated type casts. > Remove duplicated type cast in generated code > - > > Key: FLINK-13108 > URL: https://issues.apache.org/jira/browse/FLINK-13108 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There are duplicated cast operations in the generated code. For example, to > run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the > generated code looks like this: > {{@Override}} > {{public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception {}} > {{org.apache.flink.table.dataformat.BaseRow in1 = > (org.apache.flink.table.dataformat.BaseRow) > (org.apache.flink.table.dataformat.BaseRow) > converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}} > {{...}} > This issue removes the duplicated type casts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code
[ https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13108: - Description: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: {{@Override}} {{public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {}} {{org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}} {{...}} This issue remove the duplicated type casts. was: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: {{@Override}} {{public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {}} org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue()); This issue remove the duplicated type casts. > Remove duplicated type cast in generated code > - > > Key: FLINK-13108 > URL: https://issues.apache.org/jira/browse/FLINK-13108 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There are duplicated cast operations in the generated code. For example, to > run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the > generated code looks like this: > {{@Override}} > {{public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception {}} > {{org.apache.flink.table.dataformat.BaseRow in1 = > (org.apache.flink.table.dataformat.BaseRow) > (org.apache.flink.table.dataformat.BaseRow) > converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}} > {{...}} > This issue remove the duplicated type casts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code
[ https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13108: - Description: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: {{@Override}} {{public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {}} org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue()); This issue remove the duplicated type casts. was: There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue()); This issue remove the duplicated type cast. > Remove duplicated type cast in generated code > - > > Key: FLINK-13108 > URL: https://issues.apache.org/jira/browse/FLINK-13108 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There are duplicated cast operations in the generated code. For example, to > run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the > generated code looks like this: > {{@Override}} > {{public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception {}} > org.apache.flink.table.dataformat.BaseRow in1 = > (org.apache.flink.table.dataformat.BaseRow) > (org.apache.flink.table.dataformat.BaseRow) > converter$0.toInternal((org.apache.flink.types.Row) element.getValue()); > This issue remove the duplicated type casts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13108) Remove duplicated type cast in generated code
Liya Fan created FLINK-13108: Summary: Remove duplicated type cast in generated code Key: FLINK-13108 URL: https://issues.apache.org/jira/browse/FLINK-13108 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Liya Fan Assignee: Liya Fan There are duplicated cast operations in the generated code. For example, to run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the generated code looks like this: @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((org.apache.flink.types.Row) element.getValue()); This issue remove the duplicated type cast. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13058) Avoid memory copy for the trimming operations of BinaryString
[ https://issues.apache.org/jira/browse/FLINK-13058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13058: - Description: For trimming operations of BinaryString (trim, trimLeft, trimRight), if the trimmed string is identical to the original string (which is likely to happen in practice). The memory copy can be avoided by directly returning the original string. (was: For trimming operations of BinaryString (trim, trimLeft, trimRight), if the trimmed string is identical to the original string. The memory copy can be avoided by directly returning the original string. ) > Avoid memory copy for the trimming operations of BinaryString > - > > Key: FLINK-13058 > URL: https://issues.apache.org/jira/browse/FLINK-13058 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > > For trimming operations of BinaryString (trim, trimLeft, trimRight), if the > trimmed string is identical to the original string (which is likely to happen > in practice). The memory copy can be avoided by directly returning the > original string. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13058) Avoid memory copy for the trimming operations of BinaryString
Liya Fan created FLINK-13058: Summary: Avoid memory copy for the trimming operations of BinaryString Key: FLINK-13058 URL: https://issues.apache.org/jira/browse/FLINK-13058 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan For trimming operations of BinaryString (trim, trimLeft, trimRight), if the trimmed string is identical to the original string. The memory copy can be avoided by directly returning the original string. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13053) Vectorization Support in Flink
[ https://issues.apache.org/jira/browse/FLINK-13053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13053: - Description: Vectorization is a popular technique in SQL engines today. Compared with traditional row-based approach, it has some distinct advantages, for example: * Better use of CPU resources (e.g. SIMD) * More compact memory layout * More friendly to compressed data format. Currently, Flink is based on a row-based SQL engine for both stream and batch workloads. To enjoy the above benefits, we want to bring vectorization to Flink. This involves substantial changes to the existing code base. Therefore, we give a plan to carry out such changes in small, incremental steps, in order not to affect existing features. We want to apply it to batch workload first. The details can be found in our [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb] . For the past months, we have developed an initial implementation of the above ideas. Initial performance evaluations on TPC-H benchmarks show that substantial performance improvements can be obtained by vectorization (see the figure below). More details can be found in our [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb]. !image-2019-07-02-15-26-39-550.png! Special thanks to @Kurt Young’s team for all the kind help. Special thanks to @Piotr Nowojski for all the valuable feedback and help suggestions. was: Vectorization is a popular technique in SQL engines today. Compared with traditional row-based approach, it has some distinct advantages, for example: * Better use of CPU resources (e.g. SIMD) * More compact memory layout * More friendly to compressed data format. Currently, Flink is based on a row-based SQL engine for both stream and batch workloads. To enjoy the above benefits, we want to bring vectorization to Flink. This involves substantial changes to the existing code base. Therefore, we give a plan to carry out such changes in small, incremental steps, in order not to affect existing features. We want to apply it to batch workload first. The details can be found in our proposal (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb) . For the past months, we have developed an initial implementation of the above ideas. Initial performance evaluations on TPC-H benchmarks show that substantial performance improvements can be obtained by vectorization (see the figure below). More details can be found in our proposal (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb). !image-2019-07-02-15-26-39-550.png! Special thanks to @Kurt Young’s team for all the kind help. Special thanks to @Piotr Nowojski for all the valuable feedback and help suggestions. > Vectorization Support in Flink > -- > > Key: FLINK-13053 > URL: https://issues.apache.org/jira/browse/FLINK-13053 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > Attachments: image-2019-07-02-15-26-39-550.png > > > Vectorization is a popular technique in SQL engines today. Compared with > traditional row-based approach, it has some distinct advantages, for example: > > * Better use of CPU resources (e.g. SIMD) > * More compact memory layout > * More friendly to compressed data format. > > Currently, Flink is based on a row-based SQL engine for both stream and batch > workloads. To enjoy the above benefits, we want to bring vectorization to > Flink. This involves substantial changes to the existing code base. > Therefore, we give a plan to carry out such changes in small, incremental > steps, in order not to affect existing features. We want to apply it to batch > workload first. The details can be found in our > [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb] > . > > For the past months, we have developed an initial implementation of the above > ideas. Initial performance evaluations on TPC-H benchmarks show that > substantial performance improvements can be obtained by vectorization (see > the figure below). More details can be found in our > [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb]. > !image-2019-07-02-15-26-39-550.png! > Special thanks to @Kurt Young’s team for all the kind help. > Special thanks to @Piotr Nowojski for all the valuable feedback and help > suggestions. -- This message was sent by Atlassian JIRA
[jira] [Updated] (FLINK-13053) Vectorization Support in Flink
[ https://issues.apache.org/jira/browse/FLINK-13053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-13053: - Description: Vectorization is a popular technique in SQL engines today. Compared with traditional row-based approach, it has some distinct advantages, for example: * Better use of CPU resources (e.g. SIMD) * More compact memory layout * More friendly to compressed data format. Currently, Flink is based on a row-based SQL engine for both stream and batch workloads. To enjoy the above benefits, we want to bring vectorization to Flink. This involves substantial changes to the existing code base. Therefore, we give a plan to carry out such changes in small, incremental steps, in order not to affect existing features. We want to apply it to batch workload first. The details can be found in our proposal (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb) . For the past months, we have developed an initial implementation of the above ideas. Initial performance evaluations on TPC-H benchmarks show that substantial performance improvements can be obtained by vectorization (see the figure below). More details can be found in our proposal (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb). !image-2019-07-02-15-26-39-550.png! Special thanks to @Kurt Young’s team for all the kind help. Special thanks to @Piotr Nowojski for all the valuable feedback and help suggestions. was: Vectorization is a popular technique in SQL engines today. Compared with traditional row-based approach, it has some distinct advantages, for example: * Better use of CPU resources (e.g. SIMD) * More compact memory layout * More friendly to compressed data format. Currently, Flink is based on a row-based SQL engine for both stream and batch workloads. To enjoy the above benefits, we want to bring vectorization to Flink. This involves substantial changes to the existing code base. Therefore, we give a plan to carry out such changes in small, incremental steps, in order not to affect existing features. We want to apply it to batch workload first. The details can be found in our proposal. For the past months, we have developed an initial implementation of the above ideas. Initial performance evaluations on TPC-H benchmarks show that substantial performance improvements can be obtained by vectorization (see the figure below). More details can be found in our proposal. !image-2019-07-02-15-26-39-550.png! Special thanks to @Kurt Young’s team for all the kind help. Special thanks to @Piotr Nowojski for all the valuable feedback and help suggestions. > Vectorization Support in Flink > -- > > Key: FLINK-13053 > URL: https://issues.apache.org/jira/browse/FLINK-13053 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > Attachments: image-2019-07-02-15-26-39-550.png > > > Vectorization is a popular technique in SQL engines today. Compared with > traditional row-based approach, it has some distinct advantages, for example: > > * Better use of CPU resources (e.g. SIMD) > * More compact memory layout > * More friendly to compressed data format. > > Currently, Flink is based on a row-based SQL engine for both stream and batch > workloads. To enjoy the above benefits, we want to bring vectorization to > Flink. This involves substantial changes to the existing code base. > Therefore, we give a plan to carry out such changes in small, incremental > steps, in order not to affect existing features. We want to apply it to batch > workload first. The details can be found in our proposal > (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb) > . > > For the past months, we have developed an initial implementation of the above > ideas. Initial performance evaluations on TPC-H benchmarks show that > substantial performance improvements can be obtained by vectorization (see > the figure below). More details can be found in our proposal > (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb). > !image-2019-07-02-15-26-39-550.png! > Special thanks to @Kurt Young’s team for all the kind help. > Special thanks to @Piotr Nowojski for all the valuable feedback and help > suggestions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13053) Vectorization Support in Flink
Liya Fan created FLINK-13053: Summary: Vectorization Support in Flink Key: FLINK-13053 URL: https://issues.apache.org/jira/browse/FLINK-13053 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan Attachments: image-2019-07-02-15-26-39-550.png Vectorization is a popular technique in SQL engines today. Compared with traditional row-based approach, it has some distinct advantages, for example: * Better use of CPU resources (e.g. SIMD) * More compact memory layout * More friendly to compressed data format. Currently, Flink is based on a row-based SQL engine for both stream and batch workloads. To enjoy the above benefits, we want to bring vectorization to Flink. This involves substantial changes to the existing code base. Therefore, we give a plan to carry out such changes in small, incremental steps, in order not to affect existing features. We want to apply it to batch workload first. The details can be found in our proposal. For the past months, we have developed an initial implementation of the above ideas. Initial performance evaluations on TPC-H benchmarks show that substantial performance improvements can be obtained by vectorization (see the figure below). More details can be found in our proposal. !image-2019-07-02-15-26-39-550.png! Special thanks to @Kurt Young’s team for all the kind help. Special thanks to @Piotr Nowojski for all the valuable feedback and help suggestions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13043) Fix the bug of parsing Dewey number from string
Liya Fan created FLINK-13043: Summary: Fix the bug of parsing Dewey number from string Key: FLINK-13043 URL: https://issues.apache.org/jira/browse/FLINK-13043 Project: Flink Issue Type: Bug Components: Library / CEP Reporter: Liya Fan Assignee: Liya Fan There is a bug in the current implementation for parsing the Dewey number: String[] splits = deweyNumberString.split("\\."); if (splits.length == 0) { return new DeweyNumber(Integer.parseInt(deweyNumberString)); } The length in the if condition should be 1 instead of 0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12628) Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-12628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12628: Assignee: Liya Fan > Check test failure if partition has no consumers in > Execution.getPartitionMaxParallelism > > > Key: FLINK-12628 > URL: https://issues.apache.org/jira/browse/FLINK-12628 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Andrey Zagrebin >Assignee: Liya Fan >Priority: Major > > Currently, we work around this case in Execution.getPartitionMaxParallelism > because of tests: > // TODO consumers.isEmpty() only exists for test, currently there has to be > exactly one consumer in real jobs! > though partition is supposed to have always at least one consumer atm. > We should check which test fails and consider fixing it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12922) Remove method parameter from OperatorCodeGenerator
Liya Fan created FLINK-12922: Summary: Remove method parameter from OperatorCodeGenerator Key: FLINK-12922 URL: https://issues.apache.org/jira/browse/FLINK-12922 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Liya Fan Assignee: Liya Fan The TableConfig parameter of OperatorCodGenerator#generateOneInputStreamOperator should be removed, because: # This parameter is never actually used. # If it is ever used in the future, we can use ctx.getConfig to get the same object # The method signature should be consistent. The method generateTwoInputStreamOperator does not have this parameter. So this parameter should also be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869126#comment-16869126 ] Liya Fan edited comment on FLINK-12886 at 6/21/19 3:27 AM: --- [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, the maximum memory requirement during the steps is 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simply allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Note that the maximum memory requirement during the process is only 80 MB, so our 100 MB memory space will serve the request well. In addition, there is no need for the memory copy, so the performance overhead no longer exists. What do you think? was (Author: fan_li_ya): [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, this will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simple allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Our 100 MB memory space will serve the requests well. In addition, there is no need for the memory copy, so the performance overhead is removed. What do you think? > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869126#comment-16869126 ] Liya Fan commented on FLINK-12886: -- [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, this will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simple allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Our 100 MB memory space will serve the requests well. In addition, there is no need for the memory copy, so the performance overhead is removed. What do you think? > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868505#comment-16868505 ] Liya Fan commented on FLINK-12886: -- [~lzljs3620320], thanks for your feedback and suggestion. The original intention of this Jira # Make the logic for manipulating memory segment array simpler. # Reduce redundant operations to make the code more readable and easy to maintain, and less likely to produce bugs. # Improve performance by reducing code complexity (see e.g. [https://github.com/apache/flink/pull/8773|https://github.com/apache/flink/pull/8773).]) Maybe I should illustrate the effects with an example in the code. > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868500#comment-16868500 ] Liya Fan commented on FLINK-12886: -- [~ykt836], thanks a lot for your feedback. If we chose option 2, I think the value of this proposal is much smaller. I agree that we should first discuss the merit of this utility class. This is what I can think of currently: Advantage: # Clearer code structure: all operations for dealing with memory segment set are centralized. So redundant code can be removed, and the amount of code will be smaller. # Better performance can be achieved. Currently, some performance overhead is paid due to the overly complex memory segment set operations (see e.g. [https://github.com/apache/flink/pull/8773).] Disadvantage: 1. Existing methods need to be refactored, this will be a long continued process. In fact, there are some overlap between functionalities provided by ContainerMemorySegment and SegmentsUtil. Both classes have their merits, but I think ContainerMemorySegment has some advantages over SegmentsUtil: # It encapsulates state objects like segments, offset, segment size, etc. So this is more appropriate for object oriented programming, and leads to better code structure. # It makes it easier to manage the states, which makes it difficult to give rise to bugs. > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868251#comment-16868251 ] Liya Fan commented on FLINK-12886: -- [~ykt836] [~lzljs3620320], two ideas to resolve the performance degradation. Would you please give some comments? 1. Let ContainerMemorySegment and MemorySegment extends a common super interface, which defines the basic operations for accessing data: public interface MemoryAccessible { public int getInt(int index); public void setInt(int index, int value); ... } public class MemorySegment implements MemoryAccessible ... public class ContainerMemorySegment implements MemoryAccessible ... For this method, the MemorySegment class hierarchy is unaffected, so code that depends on MemorySegment does not have performance affected. In addition, the code that expects a MemoryAccessible can accept both a MemorySegment and a ContainerMemorySegment. 2. ContainerMemorySegment no longer inherits from MemorySegment. In this way, ContainerMemorySegment just acts a wrapper for a set of MemorySegment. So wherever a MemorySegment is expected, a ContainerMemorySegment cannot be provided. Also, ContainerMemorySegment can be moved to module blink-runtime, because it is not a general MemorySegment. > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12900) Refactor the class hierarchy for BinaryFormat
Liya Fan created FLINK-12900: Summary: Refactor the class hierarchy for BinaryFormat Key: FLINK-12900 URL: https://issues.apache.org/jira/browse/FLINK-12900 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan The there are many classes in the class hierarchy of BinaryFormat. They share the same memory format: header + nullable bits + fixed length part + variable length part So many operations can be applied to a number of sub-classes. Currently, many such operations are implemented in each sub-class, although they implement identical functionality. This makes the code hard to understand and maintain. In this proposal, we refactor the class hierarchy, and move common operations into the base class, leaving only one implementation for each common operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867424#comment-16867424 ] Liya Fan commented on FLINK-12886: -- [~ykt836] Thanks a lot for the article. I will take a look. > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867398#comment-16867398 ] Liya Fan commented on FLINK-12886: -- Hi [~lzljs3620320], thanks for your comments. Please see my comments in-line: {color:#59afe1}"First, I don't think one more MemorySegment implementation is a good thing. You can take a look to the comments of MemorySegment, For best efficiency, the code that uses this class should make sure that only one subclass is loaded, or that the methods that are abstract in this class are used only from one of the subclasses."{color} - Sure. This is the greatest obstacle for this proposal. A new type of MemorySegment would introduce performance penalty. The problem is how great is the penalty, and if the benefits introduced by this proposal justifies the overhead. {color:#59afe1}Second, The segment size may be not a power of 2 in BinaryString and BinaryRow. You can see BinaryRowSerializer.deserialize.{color} - We can cut the segment into chunks with sizes equal to power of 2. Note that the purpose of power of 2 segment size is just for improving performance. So it is just something nice to have. {color:#59afe1}Third, in LongHashPartition, the calculated offset will reuse to getLong. And it doesn't have that much code.{color} - In my opinion, there are many places in the code base where a simple operation is made complicated, which makes the code hard to understand and maintain. I think you know more about this than I. > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12886) Support container memory segment
Liya Fan created FLINK-12886: Summary: Support container memory segment Key: FLINK-12886 URL: https://issues.apache.org/jira/browse/FLINK-12886 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan Attachments: image-2019-06-18-17-59-42-136.png We observe that in many scenarios, the operations/algorithms are based on an array of MemorySegment. These memory segments form a large, combined, and continuous memory space. For example, suppose we have an array of n memory segments. Memory addresses from 0 to segment_size - 1 are served by the first memory segment; memory addresses from segment_size to 2 * segment_size - 1 are served by the second memory segment, and so on. Specific algorithms decide the actual MemorySegment to serve the operation requests. For some rare cases, two or more memory segments serve the requests. There are many operations based on such a paradigm, for example, {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, {{LongHashPartition#MatchIterator#get}}, etc. The problem is that, for memory segment array based operations, large amounts of code is devoted to 1. Computing the memory segment index & offset within the memory segment. 2. Processing boundary cases. For example, to write an integer, there are only 2 bytes left in the first memory segment, and the remaining 2 bytes must be written to the next memory segment. 3. Differentiate processing for short/long data. For example, when copying memory data to a byte array. Different methods are implemented for cases when 1) the data fits in a single segment; 2) the data spans multiple segments. Therefore, there are much duplicated code to achieve above purposes. What is worse, this paradigm significantly increases the amount of code, making the code more difficult to read and maintain. Furthermore, it easily gives rise to bugs which difficult to find and debug. To address these problems, we propose a new type of memory segment: {{ContainerMemorySegment}}. It is based on an array of underlying memory segments with the same size. It extends from the {{MemorySegment}} base class, so it provides all the functionalities provided by {{MemorySegment}}. In addition, it hides all the details for dealing with specific memory segments, and acts as if it were a big continuous memory region. A prototype implementation is given below: !image-2019-06-18-17-59-42-136.png|thumbnail! With this new type of memory segment, many operations/algorithms can be greatly simplified, without affecting performance. This is because, 1. Many checks, boundary processing are already there. We just move them to the new class. 2. We optimize the implementation of the new class, so the special optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12879) Improve the performance of AbstractBinaryWriter
Liya Fan created FLINK-12879: Summary: Improve the performance of AbstractBinaryWriter Key: FLINK-12879 URL: https://issues.apache.org/jira/browse/FLINK-12879 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan Improve the performance of AbstractBinaryWriter by: 1. remove unnecessary memory copy 2. improve the performance of rounding buffer size. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12811) flink-table-planner-blink compile error
[ https://issues.apache.org/jira/browse/FLINK-12811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861775#comment-16861775 ] Liya Fan commented on FLINK-12811: -- I tried two different versions of JDK1.8, but could not reproduce this problem. > flink-table-planner-blink compile error > --- > > Key: FLINK-12811 > URL: https://issues.apache.org/jira/browse/FLINK-12811 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: vinoyang >Priority: Major > > {code:java} > 10:32:18.054 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-table-planner-blink_2.11: Compilation > failure: Compilation failure: > 10:32:18.054 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[35,35] > cannot find symbol > 10:32:18.055 [ERROR] symbol: class JavaScalaConversionUtil > 10:32:18.055 [ERROR] location: package org.apache.flink.table.util > 10:32:18.057 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[132,33] > incompatible types: org.apache.calcite.plan.RelOptPlanner is not a > functional interface > 10:32:18.058 [ERROR] multiple non-overriding abstract methods found in > interface org.apache.calcite.plan.RelOptPlanner > 10:32:18.058 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[174,24] > cannot find symbol > 10:32:18.058 [ERROR] symbol: variable JavaScalaConversionUtil > 10:32:18.059 [ERROR] location: class > org.apache.flink.table.planner.PlannerContext > 10:32:18.060 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[190,24] > cannot find symbol > 10:32:18.061 [ERROR] symbol: variable JavaScalaConversionUtil > 10:32:18.061 [ERROR] location: class > org.apache.flink.table.planner.PlannerContext > 10:32:18.062 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[204,24] > cannot find symbol > 10:32:18.062 [ERROR] symbol: variable JavaScalaConversionUtil > 10:32:18.063 [ERROR] location: class > org.apache.flink.table.planner.PlannerContext > 10:32:18.063 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[219,86] > incompatible types: > scala.collection.mutable.ListBuffer > cannot be converted to java.util.List > {code} > log details: [https://api.travis-ci.org/v3/job/544108356/log.txt] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12750) Gettting Started - Docker Playground - Interactive SQL Playground
[ https://issues.apache.org/jira/browse/FLINK-12750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12750: - Component/s: Documentation > Gettting Started - Docker Playground - Interactive SQL Playground > - > > Key: FLINK-12750 > URL: https://issues.apache.org/jira/browse/FLINK-12750 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > In this ticket we add the Flink Cluster Playground, a docker-compose based > setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster) and > an SQL-Client. > The general setup should be in line with FLINK-12749. > **Open Questions** > * Where to host the SQL Client image? Can we somehow also use existing plain > Flink images? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12749) Getting Started - Docker Playgrounds - Flink Cluster Playground
[ https://issues.apache.org/jira/browse/FLINK-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12749: - Component/s: Documentation > Getting Started - Docker Playgrounds - Flink Cluster Playground > --- > > Key: FLINK-12749 > URL: https://issues.apache.org/jira/browse/FLINK-12749 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > In this ticket we add the Flink Cluster Playground, a docker-compose based > setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster), > including a step-by-step guide for some common commands (job submission, > savepoints, etc). > *Some Open Questions:* > * Which Flink images to use? `library/flink` with dynamic properties would be > the most maintainable, I think. It would be preferable, if we don't need to > host any custom images for this, but can rely on the existing plain Flink > images. > * Which Flink jobs to use? An updated version > {{org.apache.flink.streaming.examples.statemachine.StateMachineExample}} > might be a good option as it can with or without Kafka and contains a data > generator writing to Kafka already (see next questions). > * How to get data into Kafka? Maybe just provide a small bash > script/one-liner to produce into Kafka topic or see question above. > * Which Kafka Images to use? https://hub.docker.com/r/wurstmeister/kafka/ > seems to be well-maintained and is openly available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12748) Getting Started - Flink Overview
[ https://issues.apache.org/jira/browse/FLINK-12748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12748: - Component/s: Documentation > Getting Started - Flink Overview > > > Key: FLINK-12748 > URL: https://issues.apache.org/jira/browse/FLINK-12748 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > In this ticket we should add a 1-2 page introduction & overview of Apache > Flink including among other things an overview of the API Stack (DataStream > API & SQL/Table API), an introduction to *stateful* stream processing and > Flink's role in an overall stream processing architecture. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12747) Getting Started - Table API Example Walkthrough
[ https://issues.apache.org/jira/browse/FLINK-12747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12747: - Component/s: Documentation > Getting Started - Table API Example Walkthrough > --- > > Key: FLINK-12747 > URL: https://issues.apache.org/jira/browse/FLINK-12747 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > This tickets adds the Example Walkthrough for the Table API, which should > follow the same structure as the DataStream Example (FLINK-12746), which > needs to be completed first. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12746) Getting Started - Project Setup and DataStream Example Walkthrough
[ https://issues.apache.org/jira/browse/FLINK-12746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12746: - Component/s: Documentation > Getting Started - Project Setup and DataStream Example Walkthrough > -- > > Key: FLINK-12746 > URL: https://issues.apache.org/jira/browse/FLINK-12746 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > In this ticket we should add "Project Setup" and "Quickstarts -> Example > Walkthrough - DataStream API", which covers everything what we have today. > This will replace the current "Tutorials" and "Examples" section, which can > be removed as part of this ticket as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12730) Combine BitSet implementations in flink-runtime
Liya Fan created FLINK-12730: Summary: Combine BitSet implementations in flink-runtime Key: FLINK-12730 URL: https://issues.apache.org/jira/browse/FLINK-12730 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Liya Fan Assignee: Liya Fan There are two implementations for BitSet in flink-runtime ocmponent: one is org.apache.flink.runtime.operators.util.BloomFilter#BitSet, while the other is org.apache.flink.runtime.operators.util.BitSet The two classes are quite similar in their API and implementations. The only difference is that, the former is based based on long operation while the latter is based on byte operation. This has the following consequence: # The byte based BitSet has better performance for get/set operations. # The long based BitSet has better performance for the clear operation. We combine the two implementations and make the best of both worlds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12687) ByteHashSet is always in dense mode
[ https://issues.apache.org/jira/browse/FLINK-12687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12687: - Component/s: (was: Runtime / Operators) Table SQL / Runtime > ByteHashSet is always in dense mode > --- > > Key: FLINK-12687 > URL: https://issues.apache.org/jira/browse/FLINK-12687 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Since there are only 256 possible byte values, the largest possible range is > 255, and the condition > range < OptimizableHashSet.DENSE_THRESHOLD > must be satisfied. So ByteHashSet must be in dense mode. > We can make use of this to improve the performance and code structure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12701) Column name alias causes exception when used with where and group-by
[ https://issues.apache.org/jira/browse/FLINK-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16854154#comment-16854154 ] Liya Fan commented on FLINK-12701: -- Hi [~josh.bradt], thanks for reporting this problem. This seems a real bug of Flink. Would you please provide more details to reproduce the problem, like the table schema, sample data? > Column name alias causes exception when used with where and group-by > > > Key: FLINK-12701 > URL: https://issues.apache.org/jira/browse/FLINK-12701 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0 >Reporter: Josh Bradt >Priority: Major > > Assigning a column an alias containing a space sometimes causes an exception > even though the docs suggest this is valid. > Assume we have a table {{Groups}} that contains a string-typed column called > {{name}}. Then the query > {code:sql} > SELECT `Groups`.`name` AS `Group Name` FROM `Groups` > {code} > works as expected, but > {code:sql} > SELECT `Groups`.`name` AS `Group Name` > FROM `Groups` > WHERE `Groups`.`name` LIKE 'Treat%' > ORDER BY `Groups`.`name` ASC > {code} > produces the following exception > {code:java} > org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: > Invalid tuple field reference "Group Name". > at > org.apache.flink.api.java.typeutils.RowTypeInfo.getFlatFields(RowTypeInfo.java:97) > at > org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:266) > at > org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223) > at org.apache.flink.api.java.DataSet.partitionByRange(DataSet.java:1298) > at > org.apache.flink.table.plan.nodes.dataset.DataSetSort.translateToPlan(DataSetSort.scala:99) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311) > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879) > at org.apache.flink.table.api.Table.insertInto(table.scala:1126) > [...] > {code} > If you remove the {{WHERE}} clause or the {{ORDER BY}} clause, it works fine. > It only fails when both are included. Additionally, it works fine if the > column alias ({{AS `Group Name`}}) is removed or if it doesn't contain a > space ({{AS `GroupName`}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12687) ByteHashSet is always in dense mode
Liya Fan created FLINK-12687: Summary: ByteHashSet is always in dense mode Key: FLINK-12687 URL: https://issues.apache.org/jira/browse/FLINK-12687 Project: Flink Issue Type: Improvement Components: Runtime / Operators Reporter: Liya Fan Assignee: Liya Fan Since there are only 256 possible byte values, the largest possible range is 255, and the condition range < OptimizableHashSet.DENSE_THRESHOLD must be satisfied. So ByteHashSet must be in dense mode. We can make use of this to improve the performance and code structure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850307#comment-16850307 ] Liya Fan commented on FLINK-12620: -- [~mikekap], thanks for the clarification. Would you please give more details about the Task and A, B, C. I want to know if this is a general case, or a special case. > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849567#comment-16849567 ] Liya Fan commented on FLINK-12620: -- [~mikekap], thanks for providing additional details. I don't quite understand why making the method synchronized solves the problem. In the example you provided, if T1 enters the synchronization block first, does it have to wait for C? If so, it seems the deadlock still exists. If not, why? Time> T1: [deserialize] -> Task1 -> A -> B -> (wait for C) T2: [deserialize] -> Task2 -> C -> (wait for B) > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848865#comment-16848865 ] Liya Fan commented on FLINK-12620: -- [~mikekap] Thank you for finding this problem. Would you please give more details about the problem, like the example code, or your patch? > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846399#comment-16846399 ] Liya Fan commented on FLINK-12593: -- Hi [~jark], thank you so much for sharing the good news and the documents. I will take a closer look at the documents. However, I guess I also need to get more knowledge about CEP, before I can give some useful comments about it :) > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846381#comment-16846381 ] Liya Fan commented on FLINK-12586: -- Hi [~shinhira_kazunori], thanks for finding this problem. I have provided a fix. Please take a look. > Stderr and stdout are reversed in OptimizerPlanEnvironment > -- > > Key: FLINK-12586 > URL: https://issues.apache.org/jira/browse/FLINK-12586 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kazunori Shinhira >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that > stdout is output as System.err and stderr is output as System.out. > [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108] > > I think, It should be like as bellow. > {code:java} > throw new ProgramInvocationException( > "The program plan could not be fetched - the program aborted pre-maturely." > + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr) > + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12593) Revise the document for CEP
Liya Fan created FLINK-12593: Summary: Revise the document for CEP Key: FLINK-12593 URL: https://issues.apache.org/jira/browse/FLINK-12593 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Liya Fan The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to understand and follow, especially for beginners. I suggest revising from the following aspects: 1. Give more detailed descriptions of existing examples. 2. More examples are required to illustrate the features. 3. More explanations are required for some concepts, like contiguity. 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845757#comment-16845757 ] Liya Fan commented on FLINK-12319: -- [~mpf] The problem has been fixed by our [PR|[https://github.com/apache/flink/pull/8511].] Would you please take a look? > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at >
[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12319: Assignee: Liya Fan > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Assignee: Liya Fan >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at >
[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12319: Assignee: (was: Liya Fan) > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at >
[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12319: Assignee: Liya Fan > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Assignee: Liya Fan >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at >
[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845566#comment-16845566 ] Liya Fan commented on FLINK-12319: -- Hi [~mpf], thank you for your feedback. I am looking at this issue. > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at >
[jira] [Updated] (FLINK-12335) Remove useless code in SegmentsUtil
[ https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12335: - Summary: Remove useless code in SegmentsUtil (was: Improvement the performance of class SegmentsUtil) > Remove useless code in SegmentsUtil > --- > > Key: FLINK-12335 > URL: https://issues.apache.org/jira/browse/FLINK-12335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Improve the performance of class SegmentsUtil: > To evaluate the offset, an integer is bitand with a mask to clear to low > bits, and then shift right. The bitand is useless: > ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12553) Fix a bug in SqlDateTimeUtils#parseToTimeMillis
[ https://issues.apache.org/jira/browse/FLINK-12553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12553: - Description: If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it returns 1230 now. (was: If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it returns 230 now.) > Fix a bug in SqlDateTimeUtils#parseToTimeMillis > --- > > Key: FLINK-12553 > URL: https://issues.apache.org/jira/browse/FLINK-12553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Trivial > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it > returns 1230 now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12553) Fix a bug in SqlDateTimeUtils#parseToTimeMillis
Liya Fan created FLINK-12553: Summary: Fix a bug in SqlDateTimeUtils#parseToTimeMillis Key: FLINK-12553 URL: https://issues.apache.org/jira/browse/FLINK-12553 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it returns 230 now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11421) Add compilation options to allow compiling generated code with JDK compiler
[ https://issues.apache.org/jira/browse/FLINK-11421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833687#comment-16833687 ] Liya Fan commented on FLINK-11421: -- [~lzljs3620320] Thanks a lot for your confirm. >> I think we should know what patterns can be improved and what patterns may >> not be improved. (and how much has it improved) This is an interesting question, and I would like to spend some effort on that. >> I mean you can run these tests manual, just test the JCA logical. Sounds good. Thanks for your suggestion. > Add compilation options to allow compiling generated code with JDK compiler > > > Key: FLINK-11421 > URL: https://issues.apache.org/jira/browse/FLINK-11421 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Original Estimate: 240h > Time Spent: 20m > Remaining Estimate: 239h 40m > > Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code > generation. That is, Flink generates their source code dynamically, and then > compile it into Java Byte Code, which is load and executed at runtime. > > By default, Flink compiles the generated source code by Janino. This is fast, > as the compilation often finishes in hundreds of milliseconds. The generated > Java Byte Code, however, is of poor quality. To illustrate, we use Java > Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) > queries show that the E2E time can be more than 10% shorter, when operators > are compiled by JCA, despite that it takes more time (a few seconds) to > compile with JCA. > > Therefore, we believe it is beneficial to compile generated code by JCA in > the following scenarios: 1) For batch jobs, the E2E time is relatively long, > so it is worth of spending more time compiling and generating high quality > Java Byte Code. 2) For repeated stream jobs, the generated code will be > compiled once and run many times. Therefore, it pays to spend more time > compiling for the first time, and enjoy the high byte code qualities for > later runs. > > According to the above observations, we want to provide a compilation option > (Janino, JCA, or dynamic) for Flink, so that the user can choose the one > suitable for their specific scenario and obtain better performance whenever > possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11421) Add compilation options to allow compiling generated code with JDK compiler
[ https://issues.apache.org/jira/browse/FLINK-11421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833664#comment-16833664 ] Liya Fan commented on FLINK-11421: -- Hi [~lzljs3620320], thanks a lot for your information. Can I restart the PR now? My comments in line: 1.Why is Java Compiler faster than Janino? any technical details and evidence? Generally speaking, if a compiler takes longer time to compile the code, the compilation results will have higher quality. This is because, a compiler taking longer time usually applies more optimizations to the code. Similarly, for native language compilers, like gcc, we have different optimization levels, -O0, -O1, -O2, and -O3. A higher compilation level means a longer compilation time. However, the generated machine code will have better quality. 2.Do some benchmark to measure how fast E2E run after compiling by JCA? We first found that JCA could improve E2E performance when we were trying to support vectorization of TPC-H Q1. JCA compilation provided a performance improvement of about 27% (from 27-28s to 20s). This is also witnessed in some other TPC-H Queries, like Q12, Q18, etc. 3.Do some benchmark to measure how slowly JCA compiles? Good question. It takes about 2s to finish a JCA compilation task, which is more than 10 times slower than compiling by Janino. To alleviate the performance impact, we introduce 2 improvements: 1) Compilation by chain: it seems the compilation time does not increase much as the number of source files increases. So we compile the source code for all operators in a chain in a single batch. 2) Class cache: once a source file is compiled, we save it into a cache, so other tasks in the same JVM can reuse the compilation results. 4.Open the JCA compiler to run all tests of table-planner? Sounds reasonable. The only drawback can be that, the time for running the tests can be much longer, since compiling by JCA is much slower. > Add compilation options to allow compiling generated code with JDK compiler > > > Key: FLINK-11421 > URL: https://issues.apache.org/jira/browse/FLINK-11421 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Original Estimate: 240h > Time Spent: 20m > Remaining Estimate: 239h 40m > > Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code > generation. That is, Flink generates their source code dynamically, and then > compile it into Java Byte Code, which is load and executed at runtime. > > By default, Flink compiles the generated source code by Janino. This is fast, > as the compilation often finishes in hundreds of milliseconds. The generated > Java Byte Code, however, is of poor quality. To illustrate, we use Java > Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) > queries show that the E2E time can be more than 10% shorter, when operators > are compiled by JCA, despite that it takes more time (a few seconds) to > compile with JCA. > > Therefore, we believe it is beneficial to compile generated code by JCA in > the following scenarios: 1) For batch jobs, the E2E time is relatively long, > so it is worth of spending more time compiling and generating high quality > Java Byte Code. 2) For repeated stream jobs, the generated code will be > compiled once and run many times. Therefore, it pays to spend more time > compiling for the first time, and enjoy the high byte code qualities for > later runs. > > According to the above observations, we want to provide a compilation option > (Janino, JCA, or dynamic) for Flink, so that the user can choose the one > suitable for their specific scenario and obtain better performance whenever > possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826677#comment-16826677 ] Liya Fan edited comment on FLINK-10929 at 4/30/19 3:01 AM: --- Hi [Stephan Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] and [Kurt Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], thanks a lot for your valuable comments. It seems there is not much debate for point 1). For point 2), I want to list a few advantages of using Arrow as internal data structures for Flink. These conclusions are based on our investigations of research papers, as well as our engineering experiences in recent development on Blink. # Arrow adopts columnar data format, which makes it possible to apply SIMD ([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).]. SIMD means higher performance. Java starts to support SIMD in Java 8, and it is expected that high versions of Java will provide better support for SIMD. # Arrow provides more compact memory layout. To make it simple, encoding a batch of rows as Arrow vectors will require much less memory space, compared with encoding by BinaryRow. This, in turn, leads to two results: ## With the same amount of memory space, more data can be loaded into memory, and fewer spills are required. ## The cost of shuffling can be reduced significantly. For example, the amount of shuffled data for TPC-H Q4 reduces to almost the half. # Columnar format makes it much easier for data compression (see [paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). Our initial experiments have shown some positive results. There are more advantages for Arrow, but I just list a few, due to space constraints. The TPC-H performance in the attachment speaks for itself. We understand and respect that Blink is currently being merged, and some changes should be postponed. We would like to provide our devoted help in this process. However, too much change is not a good reason to refuse Arrow, if we want to make Flink the world-top-level computing engine. All these difficulties can be overcome. We have a plan to carry out the changes incrementally. was (Author: fan_li_ya): Hi [Stephan Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] and [Kurt Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], thanks a lot for your valuable comments. It seems there is not much debate for point 1). For point 2), I want to list a few advantages of using Arrow as internal data structures for Flink. These conclusions are based on our investigations of research papers, as well as our engineering experiences in secondary development on Blink. # Arrow adopts columnar data format, which makes it possible to apply SIMD ([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).]. SIMD means higher performance. Java starts to support SIMD in Java 8, and it is expected that high versions of Java will provide better support for SIMD. # Arrow provides more compact memory layout. To make it simple, encoding a batch of rows as Arrow vectors will require much less memory space, compared with encoding by BinaryRow. This, in turn, leads to two results: ## With the same amount of memory space, more data can be loaded into memory, and fewer spills are required. ## The cost of shuffling can be reduced significantly. For example, the amount of shuffled data for TPC-H Q4 reduces to almost the half. # Columnar format makes it much easier for data compression (see [paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). Our initial experiments have shown some positive results. There are more advantages for Arrow, but I just list a few, due to space constraints. The TPC-H performance in the attachment speaks for itself. We understand and respect that Blink is currently being merged, and some changes should be postponed. We would like to provide our devoted help in this process. However, too much change is not a good reason to refuse Arrow, if we want to make Flink the world-top-level computing engine. All these difficulties can be overcome. We have a plan to carry out the changes incrementally. > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that
[jira] [Created] (FLINK-12361) Remove useless expression from runtime scheduler
Liya Fan created FLINK-12361: Summary: Remove useless expression from runtime scheduler Key: FLINK-12361 URL: https://issues.apache.org/jira/browse/FLINK-12361 Project: Flink Issue Type: Improvement Components: Runtime / Operators Reporter: Liya Fan Assignee: Liya Fan Attachments: image-2019-04-29-11-16-13-492.png In the scheduleTask method of Scheduler class, expression forceExternalLocation is useless, since it always evaluates to false: !image-2019-04-29-11-16-13-492.png! So it can be removed. Moreover, by removing this expression, the code structure can be made much simpler, because there are some branches relying this expression, which can also be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12348) Use TableConfig in api module to replace TableConfig in blink-planner module.
[ https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12348: Assignee: Liya Fan > Use TableConfig in api module to replace TableConfig in blink-planner module. > - > > Key: FLINK-12348 > URL: https://issues.apache.org/jira/browse/FLINK-12348 > Project: Flink > Issue Type: Task >Reporter: Jing Zhang >Assignee: Liya Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12347) flink-table-runtime-blink is missing scala suffix
[ https://issues.apache.org/jira/browse/FLINK-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12347: Assignee: Liya Fan > flink-table-runtime-blink is missing scala suffix > - > > Key: FLINK-12347 > URL: https://issues.apache.org/jira/browse/FLINK-12347 > Project: Flink > Issue Type: Bug > Components: Build System, Table SQL / Runtime >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Liya Fan >Priority: Blocker > Fix For: 1.9.0 > > > {{flink-table-runtime-blink}} has a dependency on {{flink-streaming-java}} > and thus requires a scala suffix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12335) Improvement the performance of class SegmentsUtil
[ https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12335: - Description: Improve the performance of class SegmentsUtil: To evaluate the offset, an integer is bitand with a mask to clear to low bits, and then shift right. The bitand is useless: ((index & BIT_BYTE_POSITION_MASK) >>> 3) was: Improve the performance of class SegmentsUtil from two points: 1. In method allocateReuseBytes, the generated byte array should be cached for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not cached if bytes.length < length, and this will lead to performance overhead: if (bytes == null) { if (length <= MAX_BYTES_LENGTH) { bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); } else { bytes = new byte[length]; } } else if (bytes.length < length) { bytes = new byte[length]; } 2. To evaluate the offset, an integer is bitand with a mask to clear to low bits, and then shift right. The bitand is useless: ((index & BIT_BYTE_POSITION_MASK) >>> 3) > Improvement the performance of class SegmentsUtil > - > > Key: FLINK-12335 > URL: https://issues.apache.org/jira/browse/FLINK-12335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > > Improve the performance of class SegmentsUtil: > To evaluate the offset, an integer is bitand with a mask to clear to low > bits, and then shift right. The bitand is useless: > ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12335) Improvement the performance of class SegmentsUtil
[ https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12335: - Description: Improve the performance of class SegmentsUtil from two points: 1. In method allocateReuseBytes, the generated byte array should be cached for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not cached if bytes.length < length, and this will lead to performance overhead: if (bytes == null) { if (length <= MAX_BYTES_LENGTH) { bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); } else { bytes = new byte[length]; } } else if (bytes.length < length) { bytes = new byte[length]; } 2. To evaluate the offset, an integer is bitand with a mask to clear to low bits, and then shift right. The bitand is useless: ((index & BIT_BYTE_POSITION_MASK) >>> 3) was: Improve the performance of class SegmentsUtil from two points: In method allocateReuseBytes, the generated byte array should be cached for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not cached if bytes.length < length, and this will lead to performance overhead: if (bytes == null) { if (length <= MAX_BYTES_LENGTH) { bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); } else { bytes = new byte[length]; } } else if (bytes.length < length) { bytes = new byte[length]; } 2. To evaluate the offset, an integer is bitand with a mask to clear to low bits, and then shift right. The bitand is useless: ((index & BIT_BYTE_POSITION_MASK) >>> 3) > Improvement the performance of class SegmentsUtil > - > > Key: FLINK-12335 > URL: https://issues.apache.org/jira/browse/FLINK-12335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > > Improve the performance of class SegmentsUtil from two points: > 1. In method allocateReuseBytes, the generated byte array should be cached > for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array > is not cached if bytes.length < length, and this will lead to performance > overhead: > if (bytes == null) { > if (length <= MAX_BYTES_LENGTH) { > bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); > } > else { > bytes = new byte[length]; > } > } else if (bytes.length < length) { > bytes = new byte[length]; > } > > 2. To evaluate the offset, an integer is bitand with a mask to clear to low > bits, and then shift right. The bitand is useless: > > ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12335) Improvement the performance of class SegmentsUtil
Liya Fan created FLINK-12335: Summary: Improvement the performance of class SegmentsUtil Key: FLINK-12335 URL: https://issues.apache.org/jira/browse/FLINK-12335 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan Improve the performance of class SegmentsUtil from two points: In method allocateReuseBytes, the generated byte array should be cached for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not cached if bytes.length < length, and this will lead to performance overhead: if (bytes == null) { if (length <= MAX_BYTES_LENGTH) { bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); } else { bytes = new byte[length]; } } else if (bytes.length < length) { bytes = new byte[length]; } 2. To evaluate the offset, an integer is bitand with a mask to clear to low bits, and then shift right. The bitand is useless: ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826677#comment-16826677 ] Liya Fan commented on FLINK-10929: -- Hi [Stephan Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] and [Kurt Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], thanks a lot for your valuable comments. It seems there is not much debate for point 1). For point 2), I want to list a few advantages of using Arrow as internal data structures for Flink. These conclusions are based on our investigations of research papers, as well as our engineering experiences in secondary development on Blink. # Arrow adopts columnar data format, which makes it possible to apply SIMD ([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).]. SIMD means higher performance. Java starts to support SIMD in Java 8, and it is expected that high versions of Java will provide better support for SIMD. # Arrow provides more compact memory layout. To make it simple, encoding a batch of rows as Arrow vectors will require much less memory space, compared with encoding by BinaryRow. This, in turn, leads to two results: ## With the same amount of memory space, more data can be loaded into memory, and fewer spills are required. ## The cost of shuffling can be reduced significantly. For example, the amount of shuffled data for TPC-H Q4 reduces to almost the half. # Columnar format makes it much easier for data compression (see [paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). Our initial experiments have shown some positive results. There are more advantages for Arrow, but I just list a few, due to space constraints. The TPC-H performance in the attachment speaks for itself. We understand and respect that Blink is currently being merged, and some changes should be postponed. We would like to provide our devoted help in this process. However, too much change is not a good reason to refuse Arrow, if we want to make Flink the world-top-level computing engine. All these difficulties can be overcome. We have a plan to carry out the changes incrementally. > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826610#comment-16826610 ] Liya Fan commented on FLINK-12319: -- Hi [~mpf], thanks again for the additional information. It seems like a real bug of Flink. Would you please provide more information to reproduce the error? Can we reproduce the problem in local host? What is the implementation of class LoginEvent? > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at >
[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826606#comment-16826606 ] Liya Fan commented on FLINK-12334: -- Hi [~cre...@gmail.com], thanks for opening this issue. Would you please provide more details? Which test failed? AbstractStreamOperatorTestHarnessTest#testInitializeAfterOpenning seems to be working on my computer. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16825624#comment-16825624 ] Liya Fan commented on FLINK-12319: -- Hi [~mpf], thank you for opening this issue. It seems like a bug related to the ending condition of the recursive calls. However, the related code no longer exists in the latest code base, so it may not exist in the next release. > StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer > --- > > Key: FLINK-12319 > URL: https://issues.apache.org/jira/browse/FLINK-12319 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Ubuntu 18.04 > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) >Reporter: Marco Pfatschbacher >Priority: Major > > > I wrote a simple SourceFunction that creats Events in a loop. > The CEP pattern is very simple: > {code:java} > final Pattern failurePattern = > Pattern.begin("5 or more failures", > AfterMatchSkipStrategy.skipPastLastEvent()) > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("failed"); > } > }) > .times(5) > .next("1 or more successes") > .subtype(LoginEvent.class) > .where( > new IterativeCondition() { > @Override > public boolean filter(LoginEvent value, > Context ctx) throws Exception { > return > value.get("type").equals("success"); > } > }) > .times(1) > .within(Time.milliseconds(20)); > {code} > > After about 100k Events, Flink aborts with this stacktrace: > > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263) > at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41) > Caused by: java.lang.StackOverflowError > at > org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85) > at > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355) > at > org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342) >
[jira] [Commented] (FLINK-12115) Add support in Flink to persist state in Azure's blob store
[ https://issues.apache.org/jira/browse/FLINK-12115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823638#comment-16823638 ] Liya Fan commented on FLINK-12115: -- I vote for this issue, because it enhances the portability of Flink. Hope more people will pay attention to this issue. > Add support in Flink to persist state in Azure's blob store > --- > > Key: FLINK-12115 > URL: https://issues.apache.org/jira/browse/FLINK-12115 > Project: Flink > Issue Type: New Feature > Components: FileSystems >Reporter: Piyush Narang >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > The existing set of flink-filesystems include filesystems like S3 / HDFS / > MapR etc. For folks using Azure, it would be nice to include support for > Azure's blob store as a filesystem as well. This would enable us to use Azure > blob store to store state / checkpoints for running Flink jobs. > Support for Azure's filesystem is part of the hadoop project in the > [hadoop-azure|https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-azure] > module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12290) Fix the misleading exception message in SharedSlot class
Liya Fan created FLINK-12290: Summary: Fix the misleading exception message in SharedSlot class Key: FLINK-12290 URL: https://issues.apache.org/jira/browse/FLINK-12290 Project: Flink Issue Type: Bug Reporter: Liya Fan Assignee: Liya Fan The exception message in SharedSlot.releaseSlot is misleading. It says "SharedSlot is not empty and released". But the condition should be "SharedSlot is not released and empty." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12289) Fix bugs and typos in Memory manager
Liya Fan created FLINK-12289: Summary: Fix bugs and typos in Memory manager Key: FLINK-12289 URL: https://issues.apache.org/jira/browse/FLINK-12289 Project: Flink Issue Type: Bug Components: Runtime / Operators Reporter: Liya Fan Assignee: Liya Fan According to the JavaDoc, MemoryManager.release method should throw an NPE if the input argument is null. In addition, there are some typos in class MemoryManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823035#comment-16823035 ] Liya Fan commented on FLINK-12259: -- Hi [~gauravtiwari89], I see. Replied :) Thank you. > Improve debuggability of method invocation failures in > OptimizerPlanEnvironment > > > Key: FLINK-12259 > URL: https://issues.apache.org/jira/browse/FLINK-12259 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Gaurav >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > In cases where method invocation fails without setting the `optimizerPlan`, > Flink does not always dump the stderr/stdout. Hence, logging from the method > is lost. The stacktrace alone is not always helpful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12279) Create source to allow streaming data from websocket.
[ https://issues.apache.org/jira/browse/FLINK-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822827#comment-16822827 ] Liya Fan commented on FLINK-12279: -- Hi [~Wosinsan], thanks for opening this isssue. Would you please give more details about this issue? This sounds like an interesting one. > Create source to allow streaming data from websocket. > - > > Key: FLINK-12279 > URL: https://issues.apache.org/jira/browse/FLINK-12279 > Project: Flink > Issue Type: Improvement >Reporter: Dominik Wosiński >Priority: Major > > Currently, there exists an API that allows user to read data from regular > Java socket. I think we should also create an API that will allow reading and > streaming data from websockets too. Java does have the `javax.websocket-api` > that allows asynchronous reading from webockets and I think it could be used > for this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821637#comment-16821637 ] Liya Fan commented on FLINK-12259: -- [~quan], thanks a lot for the comments. [~gauravtiwari89], I have sent a PR, please help take a look. > Improve debuggability of method invocation failures in > OptimizerPlanEnvironment > > > Key: FLINK-12259 > URL: https://issues.apache.org/jira/browse/FLINK-12259 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Gaurav >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In cases where method invocation fails without setting the `optimizerPlan`, > Flink does not always dump the stderr/stdout. Hence, logging from the method > is lost. The stacktrace alone is not always helpful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-12259: Assignee: Liya Fan > Improve debuggability of method invocation failures in > OptimizerPlanEnvironment > > > Key: FLINK-12259 > URL: https://issues.apache.org/jira/browse/FLINK-12259 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Gaurav >Assignee: Liya Fan >Priority: Minor > > In cases where method invocation fails without setting the `optimizerPlan`, > Flink does not always dump the stderr/stdout. Hence, logging from the method > is lost. The stacktrace alone is not always helpful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12246) can't get any option from the jobConfiguration of JobGraph
[ https://issues.apache.org/jira/browse/FLINK-12246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820763#comment-16820763 ] Liya Fan commented on FLINK-12246: -- Thanks for opening this issue. I do not see this is a problem. The configuration is empty initially. However, values can be inserted to it later. > can't get any option from the jobConfiguration of JobGraph > -- > > Key: FLINK-12246 > URL: https://issues.apache.org/jira/browse/FLINK-12246 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.6.3 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > > the jobConfiguration was defined as a final variable. > {code:java} > /** The job configuration attached to this job. */ > private final Configuration jobConfiguration = new Configuration(); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12223) HeapMemorySegment.getArray should return null after being freed
[ https://issues.apache.org/jira/browse/FLINK-12223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12223: - Description: According to the JavaDoc, HeapMemorySegment.getArray should return null after free is called, but it does not. (was: According to the JavaDoc, HeapMemorySegment.getArray should return null, but it does not. ) > HeapMemorySegment.getArray should return null after being freed > --- > > Key: FLINK-12223 > URL: https://issues.apache.org/jira/browse/FLINK-12223 > Project: Flink > Issue Type: Bug >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > > According to the JavaDoc, HeapMemorySegment.getArray should return null after > free is called, but it does not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12223) HeapMemorySegment.getArray should return null after being freed
Liya Fan created FLINK-12223: Summary: HeapMemorySegment.getArray should return null after being freed Key: FLINK-12223 URL: https://issues.apache.org/jira/browse/FLINK-12223 Project: Flink Issue Type: Bug Reporter: Liya Fan Assignee: Liya Fan According to the JavaDoc, HeapMemorySegment.getArray should return null, but it does not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12189) Fix bugs and typos in memory management related code
[ https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819706#comment-16819706 ] Liya Fan commented on FLINK-12189: -- To make the PR easier to understand to review, I have split the bugs into separate JIRAs, and provide UTs and PRs separately. > Fix bugs and typos in memory management related code > > > Key: FLINK-12189 > URL: https://issues.apache.org/jira/browse/FLINK-12189 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There are some bugs and typos in the memory related source code. For example, > # In MemoryManager.release method, it should throw an NPE, if the input is > null. But it does not. > # In HybridMemorySegment.put and get methods, the number of bytes > read/written should be specified by the input parameter, rather than > reading/writing until the end. > # In HeapMemorySegment, the data member memory should be returned. This is > because the member heapArray will not be reset to null after calling the > release method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12216) Respect the number of bytes from input parameters in HybridMemorySegment
Liya Fan created FLINK-12216: Summary: Respect the number of bytes from input parameters in HybridMemorySegment Key: FLINK-12216 URL: https://issues.apache.org/jira/browse/FLINK-12216 Project: Flink Issue Type: Bug Reporter: Liya Fan Assignee: Liya Fan For the following two methods in HybridMemorySegment class, public final void get(int offset, ByteBuffer target, int numBytes) public final void put(int offset, ByteBuffer source, int numBytes) the actual number of bytes read/written should be specified by the input parameter numBytes, but it does not for some types of ByteBuffer. Instead, it simply read/write until the end. So this is a bug and I am going to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12210) Fix a bug in AbstractPagedInputView.readLine
Liya Fan created FLINK-12210: Summary: Fix a bug in AbstractPagedInputView.readLine Key: FLINK-12210 URL: https://issues.apache.org/jira/browse/FLINK-12210 Project: Flink Issue Type: Bug Components: Runtime / Operators Reporter: Liya Fan Assignee: Liya Fan In AbstractPagedInputView.readLine, character '\r' is removed in two places, which is redundant. In addition, only trailing '\r' should be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12189) Fix bugs and typos in memory management related code
[ https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12189: - Priority: Critical (was: Major) > Fix bugs and typos in memory management related code > > > Key: FLINK-12189 > URL: https://issues.apache.org/jira/browse/FLINK-12189 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There are some bugs and typos in the memory related source code. For example, > # In MemoryManager.release method, it should throw an NPE, if the input is > null. But it does not. > # In HybridMemorySegment.put and get methods, the number of bytes > read/written should be specified by the input parameter, rather than > reading/writing until the end. > # In HeapMemorySegment, the data member memory should be returned. This is > because the member heapArray will not be reset to null after calling the > release method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12204) Improve JDBCOutputFormat ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-12204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818567#comment-16818567 ] Liya Fan commented on FLINK-12204: -- [~hequn8128], thanks for your fast response. Please go ahead and open your PR :) > Improve JDBCOutputFormat ClassCastException > --- > > Key: FLINK-12204 > URL: https://issues.apache.org/jira/browse/FLINK-12204 > Project: Flink > Issue Type: Task > Components: Connectors / JDBC >Affects Versions: 1.8.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.9.0, 1.8.1 > > > ClassCastExceptions thrown by JDBCOutputFormat are not very helpful because > they do not provide information for which input field the cast failed. > We should catch the exception and enrich it with information about the > affected field to make it more useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12204) Improve JDBCOutputFormat ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-12204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818499#comment-16818499 ] Liya Fan commented on FLINK-12204: -- I vote for this issue, because it helps debugging. If no one else is looking at it, I would like to help resolve it. > Improve JDBCOutputFormat ClassCastException > --- > > Key: FLINK-12204 > URL: https://issues.apache.org/jira/browse/FLINK-12204 > Project: Flink > Issue Type: Task > Components: Connectors / JDBC >Affects Versions: 1.8.0 >Reporter: Fabian Hueske >Priority: Major > Fix For: 1.9.0, 1.8.1 > > > ClassCastExceptions thrown by JDBCOutputFormat are not very helpful because > they do not provide information for which input field the cast failed. > We should catch the exception and enrich it with information about the > affected field to make it more useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12187) wrap FileWriter with BufferedWriter for better performance
[ https://issues.apache.org/jira/browse/FLINK-12187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817620#comment-16817620 ] Liya Fan commented on FLINK-12187: -- [~bd2019us], thanks for opening this issue. Could you please give some references to show that the performance of BufferedWriter is better? > wrap FileWriter with BufferedWriter for better performance > -- > > Key: FLINK-12187 > URL: https://issues.apache.org/jira/browse/FLINK-12187 > Project: Flink > Issue Type: Bug > Components: Examples >Reporter: bd2019us >Priority: Major > Labels: patch-available, pull-requests-available > Attachments: 1.patch > > Time Spent: 10m > Remaining Estimate: 0h > > Location: > src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java > The FileWriter.write() method is invoked multiple times in loops, which is > bad to the performance of program. The decorator class BufferedWriter can be > used to alleviate the impact of frequent IO operation with caches. Therefore, > when the write() method is intensively used, the BufferedWriter is highly > recommended and should be preferred. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12189) Fix bugs and typos in memory management related code
[ https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12189: - Summary: Fix bugs and typos in memory management related code (was: Fix bugs and typos in memory management related classes) > Fix bugs and typos in memory management related code > > > Key: FLINK-12189 > URL: https://issues.apache.org/jira/browse/FLINK-12189 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > > There are some bugs and typos in the memory related source code. For example, > # In MemoryManager.release method, it should throw an NPE, if the input is > null. But it does not. > # In HybridMemorySegment.put and get methods, the number of bytes > read/written should be specified by the input parameter, rather than > reading/writing until the end. > # In HeapMemorySegment, the data member memory should be returned. This is > because the member heapArray will not be reset to null after calling the > release method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12189) Fix bugs and typos in memory management related classes
Liya Fan created FLINK-12189: Summary: Fix bugs and typos in memory management related classes Key: FLINK-12189 URL: https://issues.apache.org/jira/browse/FLINK-12189 Project: Flink Issue Type: Bug Components: Runtime / Operators Reporter: Liya Fan Assignee: Liya Fan There are some bugs and typos in the memory related source code. For example, # In MemoryManager.release method, it should throw an NPE, if the input is null. But it does not. # In HybridMemorySegment.put and get methods, the number of bytes read/written should be specified by the input parameter, rather than reading/writing until the end. # In HeapMemorySegment, the data member memory should be returned. This is because the member heapArray will not be reset to null after calling the release method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12162) Build error in flink-table-planner
[ https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815361#comment-16815361 ] Liya Fan commented on FLINK-12162: -- Hi [~dawidwys], thanks for your comments. This problem is triggered only for low versions of Java 8, as you indicated. Hope none of our customers are still using those low versions of JDK. > Build error in flink-table-planner > -- > > Key: FLINK-12162 > URL: https://issues.apache.org/jira/browse/FLINK-12162 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There is a build error in project flink-table-planner: > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-table-planner_2.11: Compilation failure > [ERROR] > .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] > unreported exception X; must be caught or declared to be thrown > I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815264#comment-16815264 ] Liya Fan commented on FLINK-10929: -- Hi [~fhueske], thank you for your kind reply. I agree with most points. The changes should be incremental so as not to break other components. So I think the first step is to provide a flag which is disabled by default, and let the MemoryManager depend on the Arrow Buffer Allocator. With this change, all the MemorySegment will be Arrow buffers, but this is transparent to other components, and will break them. I guess I will initiate a discussion on the dev mailing list. BTW, my mail address has been kicked out from the mailing list for a couple of days, because the dev-help claims that it receives some bouncing message. Would you please add another email address of mine (fan_li...@aliyun.com) to the mailing list? Thank you in advance. > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12162) Build error in flink-table-planner
[ https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815229#comment-16815229 ] Liya Fan commented on FLINK-12162: -- Hi [~dawidwys], thanks a lot for your kind reminder. I checked the branch, and it is master. I synced the code from apache.flink more than one hour ago. Unfortunately, this problem can be repeated with certainty. > Build error in flink-table-planner > -- > > Key: FLINK-12162 > URL: https://issues.apache.org/jira/browse/FLINK-12162 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > > There is a build error in project flink-table-planner: > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-table-planner_2.11: Compilation failure > [ERROR] > .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] > unreported exception X; must be caught or declared to be thrown > I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12162) Build error in flink-table-planner
[ https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-12162: - Description: There is a build error in project flink-table-planner: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project flink-table-planner_2.11: Compilation failure [ERROR] .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] unreported exception X; must be caught or declared to be thrown I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327 was: There is a build error in project flink-table-planner: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project flink-table-planner_2.11: Compilation failure [ERROR] .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] unreported exception X; must be caught or declared to be thrown I am using JDK 1.8.0_45, maven 3.5.2 > Build error in flink-table-planner > -- > > Key: FLINK-12162 > URL: https://issues.apache.org/jira/browse/FLINK-12162 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Critical > > There is a build error in project flink-table-planner: > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile > (default-compile) on project flink-table-planner_2.11: Compilation failure > [ERROR] > .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] > unreported exception X; must be caught or declared to be thrown > I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12162) Build error in flink-table-planner
Liya Fan created FLINK-12162: Summary: Build error in flink-table-planner Key: FLINK-12162 URL: https://issues.apache.org/jira/browse/FLINK-12162 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Liya Fan Assignee: Liya Fan There is a build error in project flink-table-planner: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project flink-table-planner_2.11: Compilation failure [ERROR] .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54] unreported exception X; must be caught or declared to be thrown I am using JDK 1.8.0_45, maven 3.5.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815155#comment-16815155 ] Liya Fan edited comment on FLINK-10929 at 4/11/19 7:45 AM: --- Hi [~yanghua], thank you so much for your feedback. Below is an initial draft of our design document: [https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing] Please give your valuable comments. was (Author: fan_li_ya): Hi [~yanghua], thank you so much for your feedback. Below is an initial draft of our design document: https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815155#comment-16815155 ] Liya Fan commented on FLINK-10929: -- Hi [~yanghua], thank you so much for your feedback. Below is an initial draft of our design document: https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815127#comment-16815127 ] Liya Fan commented on FLINK-11799: -- Hi [~longtimer], thanks a lot for your feedback. I have prepared a PR, please take a look and give your valuable comments. > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815009#comment-16815009 ] Liya Fan commented on FLINK-10929: -- We have imported Arrow in our efforts to vectorize Blink batch jobs. It is an incremental change, which can be easily turned off with a single flag, and it does not affect other parts of the code base. [~fhueske], do you think it is a good time to make some initial attempts to incorporate such changes now ? :) > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Pedro Cardoso Silva >Priority: Minor > Attachments: image-2019-04-10-13-43-08-107.png > > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan reassigned FLINK-11799: Assignee: Liya Fan > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Assignee: Liya Fan >Priority: Major > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse
[ https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814396#comment-16814396 ] Liya Fan commented on FLINK-3685: - Hi [~bowen.zheng], thank you for opening this issue. The NPE indicates bugs in the system. However, I do not think it is DateSerializer's responsibility, because it is the caller's responsibility to create a reuse object, and make sure it is not null. The bug should be elsewhere. Can you please provide the whole source code to reproduce the problem? BTW, the latest code no longer uses -1 as an indicator for null. The latest code looks like this: @Override public Date deserialize(Date reuse, DataInputView source) throws IOException { final long v = source.readLong(); if (v == Long.MIN_VALUE) { return null; } reuse.setTime(v); return reuse; } > Logical error in code for DateSerializer deserialize with reuse > --- > > Key: FLINK-3685 > URL: https://issues.apache.org/jira/browse/FLINK-3685 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.0.0 >Reporter: ZhengBowen >Priority: Major > > There is a logical error in the following function in DateSerializer.java > when source read '-1' > function is: > {code} > public Date deserialize(Date reuse, DataInputView source) throws IOException { > long v = source.readLong(); > if(v == -1L) { > return null; > } > reuse.setTime(v); > return reuse; > } > {code} > when call this function for first time, if return null, then 'reuse' will be > set null by caller; > when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will > throw NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814248#comment-16814248 ] Liya Fan commented on FLINK-11799: -- Hi Jason, thanks for opening this issue. For problem 1), I don't think it is a big problem, although I think some warning should be generated at least. This is because it only swallows exception of type KryoException, and for this type of exception, it handles it by reading and serializing the raw data directly. This should be fine, for data that cannot be copied. For problem 2), I think it is a good point, because it can be asserted at this point in the code that the return value should never be null. However, I think this check should be made by the copy or readObject method. If you think it is OK, I can take care of this issue :) > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Priority: Major > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12153) 提交flink job到flink环境下报错
[ https://issues.apache.org/jira/browse/FLINK-12153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814230#comment-16814230 ] Liya Fan commented on FLINK-12153: -- According to the error message, your hadoop is outdated. Please try again with a new version of hadoop. > 提交flink job到flink环境下报错 > -- > > Key: FLINK-12153 > URL: https://issues.apache.org/jira/browse/FLINK-12153 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 > Environment: flink maven > > org.apache.flink > flink-streaming-java_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-kafka-0.11_2.12 > 1.7.1 > > > org.apache.hadoop > hadoop-hdfs > 2.7.2 > > > xml-apis > xml-apis > > > > > org.apache.hadoop > hadoop-common > 2.7.2 > > > > org.apache.flink > flink-hadoop-compatibility_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-filesystem_2.12 > 1.7.1 > > > > org.apache.flink > flink-connector-elasticsearch5_2.12 > 1.7.1 > > > > hadoop 环境版本 2.7.7 > >Reporter: gaojunjie >Priority: Major > > java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are > only supported for HDFS and for Hadoop version 2.7 or newer > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10130) How to define two hdfs name-node IPs in flink-conf.yaml file
[ https://issues.apache.org/jira/browse/FLINK-10130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814097#comment-16814097 ] Liya Fan commented on FLINK-10130: -- [~Paul Lin] is right. HDFS name service solves the problem. So this issue can be closed. > How to define two hdfs name-node IPs in flink-conf.yaml file > > > Key: FLINK-10130 > URL: https://issues.apache.org/jira/browse/FLINK-10130 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Keshav Lodhi >Priority: Major > Attachments: docker-entrypoints.sh > > > Hi Team, > Here is, what we are looking for: > * We have flink HA dockerized cluster with (3 zookeepers, 2 job-managers, 3 > task-managers). > * We are using HDFS from the flink to store some data. The problem we are > facing is that, we are not able to pass 2 name-node IPs in config. > * These are the config parameters we want to add two name-node IPs > # "state.checkpoints.dir: hdfs://X.X.X.X:9001/flinkdatastorage" > # "state.backend.fs.checkpointdir: > hdfs://X.X.X.X:9001/flinkdatastorage" > # "high-availability.zookeeper.storageDir: > hdfs://X.X.X.X:9001/flink/recovery" > Currently we are passing only one name-node IP > Please advise. > I have attached the sample *docker-entrypoint.sh* file: > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10391) MillisOfDay is used in place of instant for LocalTime ctor in AvroKryoSerializerUtils
[ https://issues.apache.org/jira/browse/FLINK-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liya Fan updated FLINK-10391: - Attachment: image-2019-04-10-13-54-13-788.png > MillisOfDay is used in place of instant for LocalTime ctor in > AvroKryoSerializerUtils > - > > Key: FLINK-10391 > URL: https://issues.apache.org/jira/browse/FLINK-10391 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Reporter: Ted Yu >Priority: Minor > Attachments: image-2019-04-10-13-54-13-788.png > > > From the JodaLocalTimeSerializer#write, we serialize getMillisOfDay() value > from LocalTime. > For read method: > {code} > final int time = input.readInt(true); > return new LocalTime(time, > ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC)); > {code} > It seems > http://joda-time.sourceforge.net/apidocs/org/joda/time/LocalTime.html#fromMillisOfDay(long,%20org.joda.time.Chronology) > should be used instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10391) MillisOfDay is used in place of instant for LocalTime ctor in AvroKryoSerializerUtils
[ https://issues.apache.org/jira/browse/FLINK-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814075#comment-16814075 ] Liya Fan commented on FLINK-10391: -- The current implementation already uses org.joda.time.LocalTime.fromMillisOfDay. So this issue can be closed. !image-2019-04-10-13-54-13-788.png! > MillisOfDay is used in place of instant for LocalTime ctor in > AvroKryoSerializerUtils > - > > Key: FLINK-10391 > URL: https://issues.apache.org/jira/browse/FLINK-10391 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Reporter: Ted Yu >Priority: Minor > Attachments: image-2019-04-10-13-54-13-788.png > > > From the JodaLocalTimeSerializer#write, we serialize getMillisOfDay() value > from LocalTime. > For read method: > {code} > final int time = input.readInt(true); > return new LocalTime(time, > ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC)); > {code} > It seems > http://joda-time.sourceforge.net/apidocs/org/joda/time/LocalTime.html#fromMillisOfDay(long,%20org.joda.time.Chronology) > should be used instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)