[jira] [Commented] (FLINK-15538) Separate decimal implementations into separate sub-classes

2020-01-09 Thread Liya Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012394#comment-17012394
 ] 

Liya Fan commented on FLINK-15538:
--

[~ykt836] Good point. 

Admittedly, a virtual function or a branch statement, it is hard to say which 
is more expensive in general. It depends on some environmental factors and JIT 
behaviors. 

Some results [1] suggest that the performance of bimorphic virtual function (a 
virtual function with two possible implementations) calls is not too much worse 
than a final function call, and much better than the performance of a general 
(megamorphic) virtual function call. 

Another benefit is that the subclasses do not contain irrelavent data fields 
(e.g. LooseDecimal does not hold the long value). 

[1] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/

> Separate decimal implementations into separate sub-classes
> --
>
> Key: FLINK-15538
> URL: https://issues.apache.org/jira/browse/FLINK-15538
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current implementation of Decimal values have two (somewhat independent) 
> implementations: one is based on Long, while the other is based on 
> BigDecimal. 
> This makes the Decmial class not clear (both implementations cluttered in a 
> single class) and less efficient (each method involves a if-else branch). 
> So in this issue, we make Decimal an abstract class, and separate the two 
> implementation into two sub-classes. This makes the code clearer and more 
> efficient. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15538) Separate decimal implementations into separate sub-classes

2020-01-09 Thread Liya Fan (Jira)
Liya Fan created FLINK-15538:


 Summary: Separate decimal implementations into separate sub-classes
 Key: FLINK-15538
 URL: https://issues.apache.org/jira/browse/FLINK-15538
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan


The current implementation of Decimal values have two (somewhat independent) 
implementations: one is based on Long, while the other is based on BigDecimal. 

This makes the Decmial class not clear (both implementations cluttered in a 
single class) and less efficient (each method involves a if-else branch). 

So in this issue, we make Decimal an abstract class, and separate the two 
implementation into two sub-classes. This makes the code clearer and more 
efficient. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14731) LogicalWatermarkAssigner should use specified trait set when doing copy

2019-11-12 Thread Liya Fan (Jira)
Liya Fan created FLINK-14731:


 Summary: LogicalWatermarkAssigner should use specified trait set 
when doing copy
 Key: FLINK-14731
 URL: https://issues.apache.org/jira/browse/FLINK-14731
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Liya Fan


In LogicalWatermarkAssigner#copy method, creating the new 
LogicalWatermarkAssigner object should use the trait set from the input 
parameter, instead of the trait set of the current object. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13221) Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs

2019-07-21 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889884#comment-16889884
 ] 

Liya Fan commented on FLINK-13221:
--

[~ykt836]. Sure. Thanks a lot.

> Blink planner should set ScheduleMode to 
> LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
> -
>
> Key: FLINK-13221
> URL: https://issues.apache.org/jira/browse/FLINK-13221
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Kurt Young
>Assignee: Liya Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-13221) Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs

2019-07-11 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-13221:


Assignee: Liya Fan

> Blink planner should set ScheduleMode to 
> LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
> -
>
> Key: FLINK-13221
> URL: https://issues.apache.org/jira/browse/FLINK-13221
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Kurt Young
>Assignee: Liya Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13200) Improve the generated code for if statements

2019-07-10 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13200:


 Summary: Improve the generated code for if statements
 Key: FLINK-13200
 URL: https://issues.apache.org/jira/browse/FLINK-13200
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Liya Fan
Assignee: Liya Fan


In the generated code, we often code snippet like this:

 

if (true) {
  acc$6.setNullAt(1);
} else {
  acc$6.setField(1, ((int) -1));;
}

Such code impacts the code readability, and increases the code size, making it 
more costly for compiling and transferring through network.

 

In this issue, we remove such useless if conditions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code

2019-07-04 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13108:
-
Description: 
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

{{@Override}}
{{public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {}}
  {{org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}}
  {{...}}

This issue removes the duplicated type casts.

  was:
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

{{@Override}}
{{public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {}}
  {{org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}}
  {{...}}

This issue remove the duplicated type casts.


> Remove duplicated type cast in generated code
> -
>
> Key: FLINK-13108
> URL: https://issues.apache.org/jira/browse/FLINK-13108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are duplicated cast operations in the generated code. For example, to 
> run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
> generated code looks like this:
> {{@Override}}
> {{public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {}}
>   {{org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) 
> (org.apache.flink.table.dataformat.BaseRow) 
> converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}}
>   {{...}}
> This issue removes the duplicated type casts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code

2019-07-04 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13108:
-
Description: 
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

{{@Override}}
{{public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {}}
  {{org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}}
  {{...}}

This issue remove the duplicated type casts.

  was:
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

{{@Override}}
{{public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {}}
  org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());

This issue remove the duplicated type casts.


> Remove duplicated type cast in generated code
> -
>
> Key: FLINK-13108
> URL: https://issues.apache.org/jira/browse/FLINK-13108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are duplicated cast operations in the generated code. For example, to 
> run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
> generated code looks like this:
> {{@Override}}
> {{public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {}}
>   {{org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) 
> (org.apache.flink.table.dataformat.BaseRow) 
> converter$0.toInternal((org.apache.flink.types.Row) element.getValue());}}
>   {{...}}
> This issue remove the duplicated type casts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13108) Remove duplicated type cast in generated code

2019-07-04 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13108:
-
Description: 
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

{{@Override}}
{{public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {}}
  org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());

This issue remove the duplicated type casts.

  was:
There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

@Override
public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
  org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());

This issue remove the duplicated type cast.


> Remove duplicated type cast in generated code
> -
>
> Key: FLINK-13108
> URL: https://issues.apache.org/jira/browse/FLINK-13108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are duplicated cast operations in the generated code. For example, to 
> run org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
> generated code looks like this:
> {{@Override}}
> {{public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {}}
>   org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) 
> (org.apache.flink.table.dataformat.BaseRow) 
> converter$0.toInternal((org.apache.flink.types.Row) element.getValue());
> This issue remove the duplicated type casts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13108) Remove duplicated type cast in generated code

2019-07-04 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13108:


 Summary: Remove duplicated type cast in generated code
 Key: FLINK-13108
 URL: https://issues.apache.org/jira/browse/FLINK-13108
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Liya Fan
Assignee: Liya Fan


There are duplicated cast operations in the generated code. For example, to run 
org.apache.flink.table.runtime.batch.sql.join.JoinITCase#testJoin, the 
generated code looks like this:

@Override
public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
  org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());

This issue remove the duplicated type cast.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13058) Avoid memory copy for the trimming operations of BinaryString

2019-07-02 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13058:
-
Description: For trimming operations of BinaryString (trim, trimLeft, 
trimRight), if the trimmed string is identical to the original string (which is 
likely to happen in practice). The memory copy can be avoided by directly 
returning the original string.   (was: For trimming operations of BinaryString 
(trim, trimLeft, trimRight), if the trimmed string is identical to the original 
string. The memory copy can be avoided by directly returning the original 
string. )

> Avoid memory copy for the trimming operations of BinaryString
> -
>
> Key: FLINK-13058
> URL: https://issues.apache.org/jira/browse/FLINK-13058
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>
> For trimming operations of BinaryString (trim, trimLeft, trimRight), if the 
> trimmed string is identical to the original string (which is likely to happen 
> in practice). The memory copy can be avoided by directly returning the 
> original string. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13058) Avoid memory copy for the trimming operations of BinaryString

2019-07-02 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13058:


 Summary: Avoid memory copy for the trimming operations of 
BinaryString
 Key: FLINK-13058
 URL: https://issues.apache.org/jira/browse/FLINK-13058
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


For trimming operations of BinaryString (trim, trimLeft, trimRight), if the 
trimmed string is identical to the original string. The memory copy can be 
avoided by directly returning the original string. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13053) Vectorization Support in Flink

2019-07-02 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13053:
-
Description: 
Vectorization is a popular technique in SQL engines today. Compared with 
traditional row-based approach, it has some distinct advantages, for example:

 
 * Better use of CPU resources (e.g. SIMD)
 * More compact memory layout
 * More friendly to compressed data format.

 

Currently, Flink is based on a row-based SQL engine for both stream and batch 
workloads. To enjoy the above benefits, we want to bring vectorization to 
Flink. This involves substantial changes to the existing code base. Therefore, 
we give a plan to carry out such changes in small, incremental steps, in order 
not to affect existing features. We want to apply it to batch workload first. 
The details can be found in our 
[proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb]
 .

 

For the past months, we have developed an initial implementation of the above 
ideas. Initial performance evaluations on TPC-H benchmarks show that 
substantial performance improvements can be obtained by vectorization (see the 
figure below). More details can be found in our 
[proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb].

  !image-2019-07-02-15-26-39-550.png!

Special thanks to @Kurt Young’s team for all the kind help.

Special thanks to @Piotr Nowojski for all the valuable feedback and help 
suggestions.

  was:
Vectorization is a popular technique in SQL engines today. Compared with 
traditional row-based approach, it has some distinct advantages, for example:

 
 * Better use of CPU resources (e.g. SIMD)
 * More compact memory layout
 * More friendly to compressed data format.

 

Currently, Flink is based on a row-based SQL engine for both stream and batch 
workloads. To enjoy the above benefits, we want to bring vectorization to 
Flink. This involves substantial changes to the existing code base. Therefore, 
we give a plan to carry out such changes in small, incremental steps, in order 
not to affect existing features. We want to apply it to batch workload first. 
The details can be found in our proposal 
(https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb)
 .

 

For the past months, we have developed an initial implementation of the above 
ideas. Initial performance evaluations on TPC-H benchmarks show that 
substantial performance improvements can be obtained by vectorization (see the 
figure below). More details can be found in our proposal 
(https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb).

  !image-2019-07-02-15-26-39-550.png!

Special thanks to @Kurt Young’s team for all the kind help.

Special thanks to @Piotr Nowojski for all the valuable feedback and help 
suggestions.


> Vectorization Support in Flink
> --
>
> Key: FLINK-13053
> URL: https://issues.apache.org/jira/browse/FLINK-13053
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
> Attachments: image-2019-07-02-15-26-39-550.png
>
>
> Vectorization is a popular technique in SQL engines today. Compared with 
> traditional row-based approach, it has some distinct advantages, for example:
>  
>  * Better use of CPU resources (e.g. SIMD)
>  * More compact memory layout
>  * More friendly to compressed data format.
>  
> Currently, Flink is based on a row-based SQL engine for both stream and batch 
> workloads. To enjoy the above benefits, we want to bring vectorization to 
> Flink. This involves substantial changes to the existing code base. 
> Therefore, we give a plan to carry out such changes in small, incremental 
> steps, in order not to affect existing features. We want to apply it to batch 
> workload first. The details can be found in our 
> [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb]
>  .
>  
> For the past months, we have developed an initial implementation of the above 
> ideas. Initial performance evaluations on TPC-H benchmarks show that 
> substantial performance improvements can be obtained by vectorization (see 
> the figure below). More details can be found in our 
> [proposal|https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb].
>   !image-2019-07-02-15-26-39-550.png!
> Special thanks to @Kurt Young’s team for all the kind help.
> Special thanks to @Piotr Nowojski for all the valuable feedback and help 
> suggestions.



--
This message was sent by Atlassian JIRA

[jira] [Updated] (FLINK-13053) Vectorization Support in Flink

2019-07-02 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-13053:
-
Description: 
Vectorization is a popular technique in SQL engines today. Compared with 
traditional row-based approach, it has some distinct advantages, for example:

 
 * Better use of CPU resources (e.g. SIMD)
 * More compact memory layout
 * More friendly to compressed data format.

 

Currently, Flink is based on a row-based SQL engine for both stream and batch 
workloads. To enjoy the above benefits, we want to bring vectorization to 
Flink. This involves substantial changes to the existing code base. Therefore, 
we give a plan to carry out such changes in small, incremental steps, in order 
not to affect existing features. We want to apply it to batch workload first. 
The details can be found in our proposal 
(https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb)
 .

 

For the past months, we have developed an initial implementation of the above 
ideas. Initial performance evaluations on TPC-H benchmarks show that 
substantial performance improvements can be obtained by vectorization (see the 
figure below). More details can be found in our proposal 
(https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb).

  !image-2019-07-02-15-26-39-550.png!

Special thanks to @Kurt Young’s team for all the kind help.

Special thanks to @Piotr Nowojski for all the valuable feedback and help 
suggestions.

  was:
Vectorization is a popular technique in SQL engines today. Compared with 
traditional row-based approach, it has some distinct advantages, for example:

 
 * Better use of CPU resources (e.g. SIMD)
 * More compact memory layout
 * More friendly to compressed data format.

 

Currently, Flink is based on a row-based SQL engine for both stream and batch 
workloads. To enjoy the above benefits, we want to bring vectorization to 
Flink. This involves substantial changes to the existing code base. Therefore, 
we give a plan to carry out such changes in small, incremental steps, in order 
not to affect existing features. We want to apply it to batch workload first. 
The details can be found in our proposal.

 

For the past months, we have developed an initial implementation of the above 
ideas. Initial performance evaluations on TPC-H benchmarks show that 
substantial performance improvements can be obtained by vectorization (see the 
figure below). More details can be found in our proposal.

  !image-2019-07-02-15-26-39-550.png!

Special thanks to @Kurt Young’s team for all the kind help.

Special thanks to @Piotr Nowojski for all the valuable feedback and help 
suggestions.


> Vectorization Support in Flink
> --
>
> Key: FLINK-13053
> URL: https://issues.apache.org/jira/browse/FLINK-13053
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
> Attachments: image-2019-07-02-15-26-39-550.png
>
>
> Vectorization is a popular technique in SQL engines today. Compared with 
> traditional row-based approach, it has some distinct advantages, for example:
>  
>  * Better use of CPU resources (e.g. SIMD)
>  * More compact memory layout
>  * More friendly to compressed data format.
>  
> Currently, Flink is based on a row-based SQL engine for both stream and batch 
> workloads. To enjoy the above benefits, we want to bring vectorization to 
> Flink. This involves substantial changes to the existing code base. 
> Therefore, we give a plan to carry out such changes in small, incremental 
> steps, in order not to affect existing features. We want to apply it to batch 
> workload first. The details can be found in our proposal 
> (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb)
>  .
>  
> For the past months, we have developed an initial implementation of the above 
> ideas. Initial performance evaluations on TPC-H benchmarks show that 
> substantial performance improvements can be obtained by vectorization (see 
> the figure below). More details can be found in our proposal 
> (https://docs.google.com/document/d/1cUHb-_Pbe4NMU3Igwt4tytEmI66jQxev00IL99e2wFY/edit#heading=h.50xdeg1htedb).
>   !image-2019-07-02-15-26-39-550.png!
> Special thanks to @Kurt Young’s team for all the kind help.
> Special thanks to @Piotr Nowojski for all the valuable feedback and help 
> suggestions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13053) Vectorization Support in Flink

2019-07-02 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13053:


 Summary: Vectorization Support in Flink
 Key: FLINK-13053
 URL: https://issues.apache.org/jira/browse/FLINK-13053
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan
 Attachments: image-2019-07-02-15-26-39-550.png

Vectorization is a popular technique in SQL engines today. Compared with 
traditional row-based approach, it has some distinct advantages, for example:

 
 * Better use of CPU resources (e.g. SIMD)
 * More compact memory layout
 * More friendly to compressed data format.

 

Currently, Flink is based on a row-based SQL engine for both stream and batch 
workloads. To enjoy the above benefits, we want to bring vectorization to 
Flink. This involves substantial changes to the existing code base. Therefore, 
we give a plan to carry out such changes in small, incremental steps, in order 
not to affect existing features. We want to apply it to batch workload first. 
The details can be found in our proposal.

 

For the past months, we have developed an initial implementation of the above 
ideas. Initial performance evaluations on TPC-H benchmarks show that 
substantial performance improvements can be obtained by vectorization (see the 
figure below). More details can be found in our proposal.

  !image-2019-07-02-15-26-39-550.png!

Special thanks to @Kurt Young’s team for all the kind help.

Special thanks to @Piotr Nowojski for all the valuable feedback and help 
suggestions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13043) Fix the bug of parsing Dewey number from string

2019-07-01 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13043:


 Summary: Fix the bug of parsing Dewey number from string
 Key: FLINK-13043
 URL: https://issues.apache.org/jira/browse/FLINK-13043
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Reporter: Liya Fan
Assignee: Liya Fan


There is a bug in the current implementation for parsing the Dewey number:

 

String[] splits = deweyNumberString.split("\\.");

if (splits.length == 0) {
 return new DeweyNumber(Integer.parseInt(deweyNumberString));
 }

 

The length in the if condition should be 1 instead of 0.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12628) Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism

2019-07-01 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12628:


Assignee: Liya Fan

> Check test failure if partition has no consumers in 
> Execution.getPartitionMaxParallelism
> 
>
> Key: FLINK-12628
> URL: https://issues.apache.org/jira/browse/FLINK-12628
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Assignee: Liya Fan
>Priority: Major
>
> Currently, we work around this case in Execution.getPartitionMaxParallelism 
> because of tests:
> // TODO consumers.isEmpty() only exists for test, currently there has to be 
> exactly one consumer in real jobs!
> though partition is supposed to have always at least one consumer atm.
> We should check which test fails and consider fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12922) Remove method parameter from OperatorCodeGenerator

2019-06-21 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12922:


 Summary: Remove method parameter from OperatorCodeGenerator
 Key: FLINK-12922
 URL: https://issues.apache.org/jira/browse/FLINK-12922
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Liya Fan
Assignee: Liya Fan


The TableConfig parameter of 
OperatorCodGenerator#generateOneInputStreamOperator should be removed, because:
 # This parameter is never actually used.
 # If it is ever used in the future, we can use ctx.getConfig to get the same 
object
 # The method signature should be consistent. The method 
generateTwoInputStreamOperator does not have this parameter. So this parameter 
should also be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12886) Support container memory segment

2019-06-20 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869126#comment-16869126
 ] 

Liya Fan edited comment on FLINK-12886 at 6/21/19 3:27 AM:
---

[~ykt836], [~lzljs3620320]

Another advantage I can think of is in-place expanding. In some scenarios, we 
need to expand the memory capacity, for example:
 # When we write a big string to a BinaryRowWriter, and the memory segment of 
the writer is not sufficient to hold the new data.
 # When we do rehash in a hash table

The current steps of expanding memory space is as follows:
 # Create a new memory space with a large size
 # Copy the data form the old memory space to the new memory space.

The problems with this approach:
 # The memory copy incurs performance overhead
 # The memory requirement is too big.

Let me illustrate problem 2 with a concrete example: suppose we have totally 
100 MB memory, and currently we are using a 40 MB memory space. We want to 
expand the memory space from 40 MB to 80 MB. According to the above steps, the 
maximum memory requirement during the steps is 40 + 80 = 120 MB memory, which 
is infeasible, because we only have 100 MB memory. So although the memory space 
(100 MB) is sufficient to fulfill our request (80 MB), the expand must fail.

 

This problem can be resolved by ContainerMemorySegment: we simply allocate 
another 40 MB memory and append them to the end of the existing memory segments 
held by the ContainerMemorySegment. Note that the maximum memory requirement 
during the process is only 80 MB, so our 100 MB memory space will serve the 
request well. In addition, there is no need for the memory copy, so the 
performance overhead no longer exists.

 

What do you think?


was (Author: fan_li_ya):
[~ykt836], [~lzljs3620320]

Another advantage I can think of is in-place expanding. In some scenarios, we 
need to expand the memory capacity, for example:
 # When we write a big string to a BinaryRowWriter, and the memory segment of 
the writer is not sufficient to hold the new data.
 # When we do rehash in a hash table

The current steps of expanding memory space is as follows:
 # Create a new memory space with a large size
 # Copy the data form the old memory space to the new memory space.

The problems with this approach:
 # The memory copy incurs performance overhead
 # The memory requirement is too big.

Let me illustrate problem 2 with a concrete example: suppose we have totally 
100 MB memory, and currently we are using a 40 MB memory space. We want to 
expand the memory space from 40 MB to 80 MB. According to the above steps, this 
will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 
100 MB memory. So although the memory space (100 MB) is sufficient to fulfill 
our request (80 MB), the expand must fail.

 

This problem can be resolved by ContainerMemorySegment: we simple allocate 
another 40 MB memory and append them to the end of the existing memory segments 
held by the ContainerMemorySegment. Our 100 MB memory space will serve the 
requests well. In addition, there is no need for the memory copy, so the 
performance overhead is removed.

 

What do you think?

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate 

[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-20 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869126#comment-16869126
 ] 

Liya Fan commented on FLINK-12886:
--

[~ykt836], [~lzljs3620320]

Another advantage I can think of is in-place expanding. In some scenarios, we 
need to expand the memory capacity, for example:
 # When we write a big string to a BinaryRowWriter, and the memory segment of 
the writer is not sufficient to hold the new data.
 # When we do rehash in a hash table

The current steps of expanding memory space is as follows:
 # Create a new memory space with a large size
 # Copy the data form the old memory space to the new memory space.

The problems with this approach:
 # The memory copy incurs performance overhead
 # The memory requirement is too big.

Let me illustrate problem 2 with a concrete example: suppose we have totally 
100 MB memory, and currently we are using a 40 MB memory space. We want to 
expand the memory space from 40 MB to 80 MB. According to the above steps, this 
will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 
100 MB memory. So although the memory space (100 MB) is sufficient to fulfill 
our request (80 MB), the expand must fail.

 

This problem can be resolved by ContainerMemorySegment: we simple allocate 
another 40 MB memory and append them to the end of the existing memory segments 
held by the ContainerMemorySegment. Our 100 MB memory space will serve the 
requests well. In addition, there is no need for the memory copy, so the 
performance overhead is removed.

 

What do you think?

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-20 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868505#comment-16868505
 ] 

Liya Fan commented on FLINK-12886:
--

[~lzljs3620320], thanks for your feedback and suggestion.

The original intention of this Jira
 # Make the logic for manipulating memory segment array simpler.
 # Reduce redundant operations to make the code more readable and easy to 
maintain, and less likely to produce bugs.
 # Improve performance by reducing code complexity (see e.g.  
[https://github.com/apache/flink/pull/8773|https://github.com/apache/flink/pull/8773).])

Maybe I should illustrate the effects with an example in the code.

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-20 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868500#comment-16868500
 ] 

Liya Fan commented on FLINK-12886:
--

[~ykt836], thanks a lot for your feedback.

If we chose option 2, I think the value of this proposal is much smaller.

I agree that we should first discuss the merit of this utility class. This is 
what I can think of currently:

 

Advantage:
 # Clearer code structure: all operations for dealing with memory segment set 
are centralized. So redundant code can be removed, and the amount of code will 
be smaller.
 # Better performance can be achieved. Currently, some performance overhead is 
paid due to the overly complex memory segment set operations (see e.g. 
[https://github.com/apache/flink/pull/8773).]

Disadvantage:

 1. Existing methods need to be refactored, this will be a long continued 
process.

 

In fact, there are some overlap between functionalities provided by 
ContainerMemorySegment and SegmentsUtil. Both classes have their merits, but I 
think ContainerMemorySegment has some advantages over SegmentsUtil:
 # It encapsulates state objects like segments, offset, segment size, etc. So 
this is more appropriate for object oriented programming, and leads to better 
code structure.
 # It makes it easier to manage the states, which makes it difficult to give 
rise to bugs.

 

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-19 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868251#comment-16868251
 ] 

Liya Fan commented on FLINK-12886:
--

[~ykt836] [~lzljs3620320], two ideas to resolve the performance degradation. 
Would you please give some comments?

1. Let ContainerMemorySegment and MemorySegment extends a common super 
interface, which defines the basic operations for accessing data:

public interface MemoryAccessible {

public int getInt(int index);

public void setInt(int index, int value);

...

}

public class MemorySegment implements MemoryAccessible ...

public class ContainerMemorySegment implements MemoryAccessible ...

 

For this method, the MemorySegment class hierarchy is unaffected, so code that 
depends on MemorySegment does not have performance affected. In addition, the 
code that expects a MemoryAccessible can accept both a MemorySegment and a 
ContainerMemorySegment.

 

2. ContainerMemorySegment no longer inherits from MemorySegment. In this way, 
ContainerMemorySegment just acts a wrapper for a set of MemorySegment. So 
wherever a MemorySegment is expected, a ContainerMemorySegment cannot be 
provided. 

Also, ContainerMemorySegment can be moved to module blink-runtime, because it 
is not a general MemorySegment.

 

 

 

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12900) Refactor the class hierarchy for BinaryFormat

2019-06-19 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12900:


 Summary: Refactor the class hierarchy for BinaryFormat
 Key: FLINK-12900
 URL: https://issues.apache.org/jira/browse/FLINK-12900
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


The there are many classes in the class hierarchy of BinaryFormat. They share 
the same memory format:

header + nullable bits + fixed length part + variable length part

So many operations can be applied to a number of sub-classes. Currently, many 
such operations are implemented in each sub-class, although they implement 
identical functionality. 

This makes the code hard to understand and maintain.

In this proposal, we refactor the class hierarchy, and move common operations 
into the base class, leaving only one implementation for each common operation. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-19 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867424#comment-16867424
 ] 

Liya Fan commented on FLINK-12886:
--

[~ykt836] Thanks a lot for the article. I will take a look.

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12886) Support container memory segment

2019-06-19 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867398#comment-16867398
 ] 

Liya Fan commented on FLINK-12886:
--

Hi [~lzljs3620320], thanks for your comments. Please see my comments in-line:

{color:#59afe1}"First, I don't think one more MemorySegment implementation is a 
good thing. You can take a look to the comments of MemorySegment, For best 
efficiency, the code that uses this class should make sure that only one 
subclass is loaded, or that the methods that are abstract in this class are 
used only from one of the subclasses."{color}

- Sure. This is the greatest obstacle for this proposal. A new type of 
MemorySegment would introduce performance penalty. The problem is how great is 
the penalty, and if the benefits introduced by this proposal justifies the 
overhead.

{color:#59afe1}Second, The segment size may be not a power of 2 in BinaryString 
and BinaryRow. You can see BinaryRowSerializer.deserialize.{color}

 

- We can cut the segment into chunks with sizes equal to power of 2. Note that 
the purpose of power of 2 segment size is just for improving performance. So it 
is just something nice to have.

 

{color:#59afe1}Third, in LongHashPartition, the calculated offset will reuse to 
getLong. And it doesn't have that much code.{color}

 

- In my opinion, there are many places in the code base where a simple 
operation is made complicated, which makes the code hard to understand and 
maintain. I think you know more about this than I. 

> Support container memory segment
> 
>
> Key: FLINK-12886
> URL: https://issues.apache.org/jira/browse/FLINK-12886
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-18-17-59-42-136.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We observe that in many scenarios, the operations/algorithms are based on an 
> array of MemorySegment. These memory segments form a large, combined, and 
> continuous memory space.
> For example, suppose we have an array of n memory segments. Memory addresses 
> from 0 to segment_size - 1 are served by the first memory segment; memory 
> addresses from segment_size to 2 * segment_size - 1 are served by the second 
> memory segment, and so on.
> Specific algorithms decide the actual MemorySegment to serve the operation 
> requests. For some rare cases, two or more memory segments serve the 
> requests. There are many operations based on such a paradigm, for example, 
> {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
> {{LongHashPartition#MatchIterator#get}}, etc.
> The problem is that, for memory segment array based operations, large amounts 
> of code is devoted to
> 1. Computing the memory segment index & offset within the memory segment.
>  2. Processing boundary cases. For example, to write an integer, there are 
> only 2 bytes left in the first memory segment, and the remaining 2 bytes must 
> be written to the next memory segment.
>  3. Differentiate processing for short/long data. For example, when copying 
> memory data to a byte array. Different methods are implemented for cases when 
> 1) the data fits in a single segment; 2) the data spans multiple segments.
> Therefore, there are much duplicated code to achieve above purposes. What is 
> worse, this paradigm significantly increases the amount of code, making the 
> code more difficult to read and maintain. Furthermore, it easily gives rise 
> to bugs which difficult to find and debug.
> To address these problems, we propose a new type of memory segment: 
> {{ContainerMemorySegment}}. It is based on an array of underlying memory 
> segments with the same size. It extends from the {{MemorySegment}} base 
> class, so it provides all the functionalities provided by {{MemorySegment}}. 
> In addition, it hides all the details for dealing with specific memory 
> segments, and acts as if it were a big continuous memory region.
> A prototype implementation is given below:
>  !image-2019-06-18-17-59-42-136.png|thumbnail! 
> With this new type of memory segment, many operations/algorithms can be 
> greatly simplified, without affecting performance. This is because,
> 1. Many checks, boundary processing are already there. We just move them to 
> the new class.
>  2. We optimize the implementation of the new class, so the special 
> optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12886) Support container memory segment

2019-06-18 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12886:


 Summary: Support container memory segment
 Key: FLINK-12886
 URL: https://issues.apache.org/jira/browse/FLINK-12886
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan
 Attachments: image-2019-06-18-17-59-42-136.png

We observe that in many scenarios, the operations/algorithms are based on an 
array of MemorySegment. These memory segments form a large, combined, and 
continuous memory space.

For example, suppose we have an array of n memory segments. Memory addresses 
from 0 to segment_size - 1 are served by the first memory segment; memory 
addresses from segment_size to 2 * segment_size - 1 are served by the second 
memory segment, and so on.

Specific algorithms decide the actual MemorySegment to serve the operation 
requests. For some rare cases, two or more memory segments serve the requests. 
There are many operations based on such a paradigm, for example, 
{{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, 
{{LongHashPartition#MatchIterator#get}}, etc.

The problem is that, for memory segment array based operations, large amounts 
of code is devoted to

1. Computing the memory segment index & offset within the memory segment.
 2. Processing boundary cases. For example, to write an integer, there are only 
2 bytes left in the first memory segment, and the remaining 2 bytes must be 
written to the next memory segment.
 3. Differentiate processing for short/long data. For example, when copying 
memory data to a byte array. Different methods are implemented for cases when 
1) the data fits in a single segment; 2) the data spans multiple segments.

Therefore, there are much duplicated code to achieve above purposes. What is 
worse, this paradigm significantly increases the amount of code, making the 
code more difficult to read and maintain. Furthermore, it easily gives rise to 
bugs which difficult to find and debug.

To address these problems, we propose a new type of memory segment: 
{{ContainerMemorySegment}}. It is based on an array of underlying memory 
segments with the same size. It extends from the {{MemorySegment}} base class, 
so it provides all the functionalities provided by {{MemorySegment}}. In 
addition, it hides all the details for dealing with specific memory segments, 
and acts as if it were a big continuous memory region.

A prototype implementation is given below:

 !image-2019-06-18-17-59-42-136.png|thumbnail! 

With this new type of memory segment, many operations/algorithms can be greatly 
simplified, without affecting performance. This is because,

1. Many checks, boundary processing are already there. We just move them to the 
new class.
 2. We optimize the implementation of the new class, so the special 
optimizations (e.g. optimizations for short data) are still preserved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12879) Improve the performance of AbstractBinaryWriter

2019-06-17 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12879:


 Summary: Improve the performance of AbstractBinaryWriter
 Key: FLINK-12879
 URL: https://issues.apache.org/jira/browse/FLINK-12879
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


Improve the performance of AbstractBinaryWriter by:
1. remove unnecessary memory copy
2. improve the performance of rounding buffer size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12811) flink-table-planner-blink compile error

2019-06-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861775#comment-16861775
 ] 

Liya Fan commented on FLINK-12811:
--

I tried two different versions of JDK1.8, but could not reproduce this problem.

> flink-table-planner-blink compile error
> ---
>
> Key: FLINK-12811
> URL: https://issues.apache.org/jira/browse/FLINK-12811
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: vinoyang
>Priority: Major
>
> {code:java}
> 10:32:18.054 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-table-planner-blink_2.11: Compilation 
> failure: Compilation failure:
> 10:32:18.054 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[35,35]
>  cannot find symbol
> 10:32:18.055 [ERROR] symbol:   class JavaScalaConversionUtil
> 10:32:18.055 [ERROR] location: package org.apache.flink.table.util
> 10:32:18.057 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[132,33]
>  incompatible types: org.apache.calcite.plan.RelOptPlanner is not a 
> functional interface
> 10:32:18.058 [ERROR] multiple non-overriding abstract methods found in 
> interface org.apache.calcite.plan.RelOptPlanner
> 10:32:18.058 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[174,24]
>  cannot find symbol
> 10:32:18.058 [ERROR] symbol:   variable JavaScalaConversionUtil
> 10:32:18.059 [ERROR] location: class 
> org.apache.flink.table.planner.PlannerContext
> 10:32:18.060 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[190,24]
>  cannot find symbol
> 10:32:18.061 [ERROR] symbol:   variable JavaScalaConversionUtil
> 10:32:18.061 [ERROR] location: class 
> org.apache.flink.table.planner.PlannerContext
> 10:32:18.062 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[204,24]
>  cannot find symbol
> 10:32:18.062 [ERROR] symbol:   variable JavaScalaConversionUtil
> 10:32:18.063 [ERROR] location: class 
> org.apache.flink.table.planner.PlannerContext
> 10:32:18.063 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java:[219,86]
>  incompatible types: 
> scala.collection.mutable.ListBuffer 
> cannot be converted to java.util.List
> {code}
> log details: [https://api.travis-ci.org/v3/job/544108356/log.txt]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12750) Gettting Started - Docker Playground - Interactive SQL Playground

2019-06-05 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12750:
-
Component/s: Documentation

> Gettting Started - Docker Playground - Interactive SQL Playground
> -
>
> Key: FLINK-12750
> URL: https://issues.apache.org/jira/browse/FLINK-12750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is
> * Flink Overview (~ two pages)
> * Project Setup
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> In this ticket we add the Flink Cluster Playground, a docker-compose based 
> setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster) and 
> an SQL-Client. 
> The general setup should be in line with FLINK-12749. 
> **Open Questions**
> * Where to host the SQL Client image? Can we somehow also use existing plain 
> Flink images?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12749) Getting Started - Docker Playgrounds - Flink Cluster Playground

2019-06-05 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12749:
-
Component/s: Documentation

> Getting Started - Docker Playgrounds - Flink Cluster Playground
> ---
>
> Key: FLINK-12749
> URL: https://issues.apache.org/jira/browse/FLINK-12749
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is
> * Flink Overview (~ two pages)
> * Project Setup
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> In this ticket we add the Flink Cluster Playground, a docker-compose based 
> setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster), 
> including a step-by-step guide for some common commands (job submission, 
> savepoints, etc).
> *Some Open Questions:*
> * Which Flink images to use? `library/flink` with dynamic properties would be 
> the most maintainable, I think. It would be preferable, if we don't need to 
> host any custom images for this, but can rely on the existing plain Flink 
> images.
> * Which Flink jobs to use? An updated version 
> {{org.apache.flink.streaming.examples.statemachine.StateMachineExample}} 
> might be a good option as it can with or without Kafka and contains a data 
> generator writing to Kafka already (see next questions).
> * How to get data into Kafka? Maybe just provide a small bash 
> script/one-liner to produce into Kafka topic or see question above.
> * Which Kafka Images to use? https://hub.docker.com/r/wurstmeister/kafka/ 
> seems to be well-maintained and is openly available.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12748) Getting Started - Flink Overview

2019-06-05 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12748:
-
Component/s: Documentation

> Getting Started - Flink Overview
> 
>
> Key: FLINK-12748
> URL: https://issues.apache.org/jira/browse/FLINK-12748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is 
> *  Flink Overview (~ two pages)
> * Project Setup
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> In this ticket we should add a 1-2 page introduction & overview of Apache 
> Flink including among other things an overview of the API Stack (DataStream 
> API & SQL/Table API), an introduction to *stateful* stream processing and 
> Flink's role in an overall stream processing architecture.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12747) Getting Started - Table API Example Walkthrough

2019-06-05 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12747:
-
Component/s: Documentation

> Getting Started - Table API Example Walkthrough
> ---
>
> Key: FLINK-12747
> URL: https://issues.apache.org/jira/browse/FLINK-12747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is 
> *  Flink Overview (~ two pages)
> * Project Setup
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> This tickets adds the Example Walkthrough for the Table API, which should 
> follow the same structure as the DataStream Example (FLINK-12746), which 
> needs to be completed first.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12746) Getting Started - Project Setup and DataStream Example Walkthrough

2019-06-05 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12746:
-
Component/s: Documentation

> Getting Started - Project Setup and DataStream Example Walkthrough
> --
>
> Key: FLINK-12746
> URL: https://issues.apache.org/jira/browse/FLINK-12746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is 
> *  Flink Overview (~ two pages)
> * Project Setup
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> In this ticket we should add "Project Setup" and "Quickstarts -> Example 
> Walkthrough - DataStream API", which covers everything what we have today. 
> This will replace the current "Tutorials" and "Examples" section, which can 
> be removed as part of this ticket as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12730) Combine BitSet implementations in flink-runtime

2019-06-04 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12730:


 Summary: Combine BitSet implementations in flink-runtime
 Key: FLINK-12730
 URL: https://issues.apache.org/jira/browse/FLINK-12730
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Liya Fan
Assignee: Liya Fan


There are two implementations for BitSet in flink-runtime ocmponent: one is 
org.apache.flink.runtime.operators.util.BloomFilter#BitSet, while the other is 
org.apache.flink.runtime.operators.util.BitSet

The two classes are quite similar in their API and implementations. The only 
difference is that, the former is based based on long operation while the 
latter is based on byte operation. This has the following consequence:
 # The byte based BitSet has better performance for get/set operations.
 # The long based BitSet has better performance for the clear operation.

We combine the two implementations and make the best of both worlds.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12687) ByteHashSet is always in dense mode

2019-06-03 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12687:
-
Component/s: (was: Runtime / Operators)
 Table SQL / Runtime

> ByteHashSet is always in dense mode
> ---
>
> Key: FLINK-12687
> URL: https://issues.apache.org/jira/browse/FLINK-12687
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Since there are only 256 possible byte values, the largest possible range is 
> 255, and the condition 
> range < OptimizableHashSet.DENSE_THRESHOLD
> must be satisfied. So ByteHashSet must be in dense mode.
> We can make use of this to improve the performance and code structure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12701) Column name alias causes exception when used with where and group-by

2019-06-02 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16854154#comment-16854154
 ] 

Liya Fan commented on FLINK-12701:
--

Hi [~josh.bradt], thanks for reporting this problem.

This seems a real bug of Flink. Would you please provide more details to 
reproduce the problem, like the table schema, sample data?

> Column name alias causes exception when used with where and group-by
> 
>
> Key: FLINK-12701
> URL: https://issues.apache.org/jira/browse/FLINK-12701
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Priority: Major
>
> Assigning a column an alias containing a space sometimes causes an exception 
> even though the docs suggest this is valid.
> Assume we have a table {{Groups}} that contains a string-typed column called 
> {{name}}. Then the query
> {code:sql}
> SELECT `Groups`.`name` AS `Group Name` FROM `Groups`
> {code}
> works as expected, but
> {code:sql}
> SELECT `Groups`.`name` AS `Group Name` 
> FROM `Groups` 
> WHERE `Groups`.`name` LIKE 'Treat%' 
> ORDER BY `Groups`.`name` ASC
> {code}
> produces the following exception
> {code:java}
> org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
>  Invalid tuple field reference "Group Name".
>   at 
> org.apache.flink.api.java.typeutils.RowTypeInfo.getFlatFields(RowTypeInfo.java:97)
>   at 
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:266)
>   at 
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
>   at org.apache.flink.api.java.DataSet.partitionByRange(DataSet.java:1298)
>   at 
> org.apache.flink.table.plan.nodes.dataset.DataSetSort.translateToPlan(DataSetSort.scala:99)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311)
>   at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>   at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
>   [...]
> {code}
> If you remove the {{WHERE}} clause or the {{ORDER BY}} clause, it works fine. 
> It only fails when both are included. Additionally, it works fine if the 
> column alias ({{AS `Group Name`}}) is removed or if it doesn't contain a 
> space ({{AS `GroupName`}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12687) ByteHashSet is always in dense mode

2019-05-30 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12687:


 Summary: ByteHashSet is always in dense mode
 Key: FLINK-12687
 URL: https://issues.apache.org/jira/browse/FLINK-12687
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan


Since there are only 256 possible byte values, the largest possible range is 
255, and the condition 

range < OptimizableHashSet.DENSE_THRESHOLD

must be satisfied. So ByteHashSet must be in dense mode.

We can make use of this to improve the performance and code structure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12620) Deadlock in task deserialization

2019-05-28 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850307#comment-16850307
 ] 

Liya Fan commented on FLINK-12620:
--

[~mikekap], thanks for the clarification.

Would you please give more details about the Task and A, B, C. I want to know 
if this is a general case, or a special case.

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12620) Deadlock in task deserialization

2019-05-28 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849567#comment-16849567
 ] 

Liya Fan commented on FLINK-12620:
--

[~mikekap], thanks for providing additional details. 

I don't quite understand why making the method synchronized solves the problem.

In the example you provided, if T1 enters the synchronization block first, does 
it have to wait for C? If so, it seems the deadlock still exists. If 
not, why?
Time>
T1: [deserialize] -> Task1 -> A -> B -> (wait for 
C)
T2: [deserialize] -> Task2 -> C -> (wait for B)

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12620) Deadlock in task deserialization

2019-05-27 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848865#comment-16848865
 ] 

Liya Fan commented on FLINK-12620:
--

[~mikekap] Thank you for finding this problem.

Would you please give more details about the problem, like the example code, or 
your patch?

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846399#comment-16846399
 ] 

Liya Fan commented on FLINK-12593:
--

Hi [~jark], thank you so much for sharing the good news and the documents.

I will take a closer look at the documents. However, I guess I also need to get 
more knowledge about CEP, before I can give some useful comments about it :)

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846381#comment-16846381
 ] 

Liya Fan commented on FLINK-12586:
--

Hi [~shinhira_kazunori], thanks for finding this problem. I have provided a 
fix. Please take a look.

> Stderr and stdout are reversed in OptimizerPlanEnvironment
> --
>
> Key: FLINK-12586
> URL: https://issues.apache.org/jira/browse/FLINK-12586
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kazunori Shinhira
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that 
> stdout is output as System.err and stderr is output as System.out.
> [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108]
>  
> I think, It should be like as bellow.
> {code:java}
> throw new ProgramInvocationException(
> "The program plan could not be fetched - the program aborted pre-maturely."
> + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr)
> + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12593:


 Summary: Revise the document for CEP
 Key: FLINK-12593
 URL: https://issues.apache.org/jira/browse/FLINK-12593
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Liya Fan


The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
understand and follow, especially for beginners.

I suggest revising from the following aspects:

1. Give more detailed descriptions of existing examples.

2. More examples are required to illustrate the features.

3. More explanations are required for some concepts, like contiguity.

4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845757#comment-16845757
 ] 

Liya Fan commented on FLINK-12319:
--

[~mpf] The problem has been fixed by our 
[PR|[https://github.com/apache/flink/pull/8511].] Would you please take a look?

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> 

[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-05-22 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12319:


Assignee: Liya Fan

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Assignee: Liya Fan
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> 

[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-05-22 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12319:


Assignee: (was: Liya Fan)

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> 

[jira] [Assigned] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-05-22 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12319:


Assignee: Liya Fan

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Assignee: Liya Fan
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> 

[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845566#comment-16845566
 ] 

Liya Fan commented on FLINK-12319:
--

Hi [~mpf], thank you for your feedback. I am looking at this issue.

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> 

[jira] [Updated] (FLINK-12335) Remove useless code in SegmentsUtil

2019-05-21 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12335:
-
Summary: Remove useless code in SegmentsUtil  (was: Improvement the 
performance of class SegmentsUtil)

> Remove useless code in SegmentsUtil
> ---
>
> Key: FLINK-12335
> URL: https://issues.apache.org/jira/browse/FLINK-12335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Improve the performance of class SegmentsUtil:
> To evaluate the offset, an integer is bitand with a mask to clear to low 
> bits, and then shift right. The bitand is useless:
> ((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12553) Fix a bug in SqlDateTimeUtils#parseToTimeMillis

2019-05-20 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12553:
-
Description: If parameter "1999-12-31 12:34:56.123" is used, it should 
return 123. But it returns 1230 now.  (was: If parameter "1999-12-31 
12:34:56.123" is used, it should return 123. But it returns 230 now.)

> Fix a bug in SqlDateTimeUtils#parseToTimeMillis
> ---
>
> Key: FLINK-12553
> URL: https://issues.apache.org/jira/browse/FLINK-12553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Trivial
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it 
> returns 1230 now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12553) Fix a bug in SqlDateTimeUtils#parseToTimeMillis

2019-05-20 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12553:


 Summary: Fix a bug in SqlDateTimeUtils#parseToTimeMillis
 Key: FLINK-12553
 URL: https://issues.apache.org/jira/browse/FLINK-12553
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


If parameter "1999-12-31 12:34:56.123" is used, it should return 123. But it 
returns 230 now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11421) Add compilation options to allow compiling generated code with JDK compiler

2019-05-06 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833687#comment-16833687
 ] 

Liya Fan commented on FLINK-11421:
--

[~lzljs3620320] Thanks a lot for your confirm.

>> I think we should know what patterns can be improved and what patterns may 
>> not be improved.  (and how much has it improved)

This is an interesting question, and I would like to spend some effort on that. 

>> I mean you can run these tests manual, just test the JCA logical.

Sounds good. Thanks for your suggestion.

> Add compilation options to allow compiling generated code with JDK compiler 
> 
>
> Key: FLINK-11421
> URL: https://issues.apache.org/jira/browse/FLINK-11421
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 240h
>  Time Spent: 20m
>  Remaining Estimate: 239h 40m
>
> Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code 
> generation. That is, Flink generates their source code dynamically, and then 
> compile it into Java Byte Code, which is load and executed at runtime.
>  
> By default, Flink compiles the generated source code by Janino. This is fast, 
> as the compilation often finishes in hundreds of milliseconds. The generated 
> Java Byte Code, however, is of poor quality. To illustrate, we use Java 
> Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) 
> queries show that the E2E time can be more than 10% shorter, when operators 
> are compiled by JCA, despite that it takes more time (a few seconds) to 
> compile with JCA.
>  
> Therefore, we believe it is beneficial to compile generated code by JCA in 
> the following scenarios: 1) For batch jobs, the E2E time is relatively long, 
> so it is worth of spending more time compiling and generating high quality 
> Java Byte Code. 2) For repeated stream jobs, the generated code will be 
> compiled once and run many times. Therefore, it pays to spend more time 
> compiling for the first time, and enjoy the high byte code qualities for 
> later runs.
>  
> According to the above observations, we want to provide a compilation option 
> (Janino, JCA, or dynamic) for Flink, so that the user can choose the one 
> suitable for their specific scenario and obtain better performance whenever 
> possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11421) Add compilation options to allow compiling generated code with JDK compiler

2019-05-06 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833664#comment-16833664
 ] 

Liya Fan commented on FLINK-11421:
--

Hi [~lzljs3620320], thanks a lot for your information. 
Can I restart the PR now? 

My comments in line:

1.Why is Java Compiler faster than Janino? any technical details and evidence?

Generally speaking, if a compiler takes longer time to compile the code, the 
compilation results will have higher quality. This is because, a compiler 
taking longer time usually applies more optimizations to the code. 

Similarly, for native language compilers, like gcc, we have different 
optimization levels, -O0, -O1, -O2, and -O3. A higher compilation level means a 
longer compilation time. However, the generated machine code will have better 
quality.

2.Do some benchmark to measure how fast E2E run after compiling by JCA?

We first found that JCA could improve E2E performance when we were trying to 
support vectorization of TPC-H Q1. JCA compilation provided a performance 
improvement of about 27% (from 27-28s to 20s). This is also witnessed in some 
other TPC-H Queries, like Q12, Q18, etc.

3.Do some benchmark to measure how slowly JCA compiles?

Good question. It takes about 2s to finish a JCA compilation task, which is 
more than 10 times slower than compiling by Janino. To alleviate the 
performance impact, we introduce 2 improvements:

1) Compilation by chain: it seems the compilation time does not increase much 
as the number of source files increases. So we compile the source code for all 
operators in a chain in a single batch.

2) Class cache: once a source file is compiled, we save it into a cache, so 
other tasks in the same JVM can reuse the compilation results.

4.Open the JCA compiler to run all tests of table-planner?

Sounds reasonable. The only drawback can be that, the time for running the 
tests can be much longer, since compiling by JCA is much slower. 

> Add compilation options to allow compiling generated code with JDK compiler 
> 
>
> Key: FLINK-11421
> URL: https://issues.apache.org/jira/browse/FLINK-11421
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 240h
>  Time Spent: 20m
>  Remaining Estimate: 239h 40m
>
> Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code 
> generation. That is, Flink generates their source code dynamically, and then 
> compile it into Java Byte Code, which is load and executed at runtime.
>  
> By default, Flink compiles the generated source code by Janino. This is fast, 
> as the compilation often finishes in hundreds of milliseconds. The generated 
> Java Byte Code, however, is of poor quality. To illustrate, we use Java 
> Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) 
> queries show that the E2E time can be more than 10% shorter, when operators 
> are compiled by JCA, despite that it takes more time (a few seconds) to 
> compile with JCA.
>  
> Therefore, we believe it is beneficial to compile generated code by JCA in 
> the following scenarios: 1) For batch jobs, the E2E time is relatively long, 
> so it is worth of spending more time compiling and generating high quality 
> Java Byte Code. 2) For repeated stream jobs, the generated code will be 
> compiled once and run many times. Therefore, it pays to spend more time 
> compiling for the first time, and enjoy the high byte code qualities for 
> later runs.
>  
> According to the above observations, we want to provide a compilation option 
> (Janino, JCA, or dynamic) for Flink, so that the user can choose the one 
> suitable for their specific scenario and obtain better performance whenever 
> possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10929) Add support for Apache Arrow

2019-04-29 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826677#comment-16826677
 ] 

Liya Fan edited comment on FLINK-10929 at 4/30/19 3:01 AM:
---

Hi [Stephan 
Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] 
and [Kurt 
Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], 
thanks a lot for your valuable comments. 

It seems there is not much debate for point 1). 

For point 2), I want to list a few advantages of using Arrow as internal data 
structures for Flink. These conclusions are based on our investigations of 
research papers, as well as our engineering experiences in recent development 
on Blink.
 # Arrow adopts columnar data format, which makes it possible to apply SIMD 
([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).].
 SIMD means higher performance. Java starts to support SIMD in Java 8, and it 
is expected that high versions of Java will provide better support for SIMD. 
 # Arrow provides more compact memory layout. To make it simple, encoding a 
batch of rows as Arrow vectors will require much less memory space, compared 
with encoding by BinaryRow. This, in turn, leads to two results:
 ## With the same amount of memory space, more data can be loaded into memory, 
and fewer spills are required.
 ## The cost of shuffling can be reduced significantly. For example, the amount 
of shuffled data for TPC-H Q4 reduces to almost the half. 
 # Columnar format makes it much easier for data compression (see 
[paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). 
Our initial experiments have shown some positive results.

There are more advantages for Arrow, but I just list a few, due to space 
constraints. The TPC-H performance in the attachment speaks for itself.

We understand and respect that Blink is currently being merged, and some 
changes should be postponed. We would like to provide our devoted help in this 
process.

However, too much change is not a good reason to refuse Arrow, if we want to 
make Flink the world-top-level computing engine. All these difficulties can be 
overcome. We have a plan to carry out the changes incrementally.

 

 


was (Author: fan_li_ya):
Hi [Stephan 
Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] 
and [Kurt 
Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], 
thanks a lot for your valuable comments. 

It seems there is not much debate for point 1). 

For point 2), I want to list a few advantages of using Arrow as internal data 
structures for Flink. These conclusions are based on our investigations of 
research papers, as well as our engineering experiences in secondary 
development on Blink.
 # Arrow adopts columnar data format, which makes it possible to apply SIMD 
([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).].
 SIMD means higher performance. Java starts to support SIMD in Java 8, and it 
is expected that high versions of Java will provide better support for SIMD. 
 # Arrow provides more compact memory layout. To make it simple, encoding a 
batch of rows as Arrow vectors will require much less memory space, compared 
with encoding by BinaryRow. This, in turn, leads to two results:
 ## With the same amount of memory space, more data can be loaded into memory, 
and fewer spills are required.
 ## The cost of shuffling can be reduced significantly. For example, the amount 
of shuffled data for TPC-H Q4 reduces to almost the half. 
 # Columnar format makes it much easier for data compression (see 
[paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). 
Our initial experiments have shown some positive results.

There are more advantages for Arrow, but I just list a few, due to space 
constraints. The TPC-H performance in the attachment speaks for itself.

We understand and respect that Blink is currently being merged, and some 
changes should be postponed. We would like to provide our devoted help in this 
process.

However, too much change is not a good reason to refuse Arrow, if we want to 
make Flink the world-top-level computing engine. All these difficulties can be 
overcome. We have a plan to carry out the changes incrementally.

 

 

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that 

[jira] [Created] (FLINK-12361) Remove useless expression from runtime scheduler

2019-04-28 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12361:


 Summary: Remove useless expression from runtime scheduler
 Key: FLINK-12361
 URL: https://issues.apache.org/jira/browse/FLINK-12361
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan
 Attachments: image-2019-04-29-11-16-13-492.png

In the scheduleTask method of Scheduler class, expression forceExternalLocation 
is useless, since it always evaluates to false:

 !image-2019-04-29-11-16-13-492.png! 

So it can be removed. Moreover, by removing this expression, the code structure 
can be made much simpler, because there are some branches relying this 
expression, which can also be removed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12348) Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-04-27 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12348:


Assignee: Liya Fan

> Use TableConfig in api module to replace TableConfig in blink-planner module.
> -
>
> Key: FLINK-12348
> URL: https://issues.apache.org/jira/browse/FLINK-12348
> Project: Flink
>  Issue Type: Task
>Reporter: Jing Zhang
>Assignee: Liya Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12347) flink-table-runtime-blink is missing scala suffix

2019-04-27 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12347:


Assignee: Liya Fan

> flink-table-runtime-blink is missing scala suffix
> -
>
> Key: FLINK-12347
> URL: https://issues.apache.org/jira/browse/FLINK-12347
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Table SQL / Runtime
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Liya Fan
>Priority: Blocker
> Fix For: 1.9.0
>
>
> {{flink-table-runtime-blink}} has a dependency on {{flink-streaming-java}} 
> and thus requires a scala suffix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12335) Improvement the performance of class SegmentsUtil

2019-04-26 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12335:
-
Description: 
Improve the performance of class SegmentsUtil:

To evaluate the offset, an integer is bitand with a mask to clear to low bits, 
and then shift right. The bitand is useless:


((index & BIT_BYTE_POSITION_MASK) >>> 3)

  was:
Improve the performance of class SegmentsUtil from two points:

1. In method allocateReuseBytes, the generated byte array should be cached for 
reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not 
cached if bytes.length < length, and this will lead to performance overhead:

if (bytes == null) {
  if (length <= MAX_BYTES_LENGTH) {
    bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); 
  }
  else {
    bytes = new byte[length]; 
  }
} else if (bytes.length < length) {
  bytes = new byte[length]; 
}

 

2. To evaluate the offset, an integer is bitand with a mask to clear to low 
bits, and then shift right. The bitand is useless:

 

((index & BIT_BYTE_POSITION_MASK) >>> 3)


> Improvement the performance of class SegmentsUtil
> -
>
> Key: FLINK-12335
> URL: https://issues.apache.org/jira/browse/FLINK-12335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>
> Improve the performance of class SegmentsUtil:
> To evaluate the offset, an integer is bitand with a mask to clear to low 
> bits, and then shift right. The bitand is useless:
> ((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12335) Improvement the performance of class SegmentsUtil

2019-04-26 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12335:
-
Description: 
Improve the performance of class SegmentsUtil from two points:

1. In method allocateReuseBytes, the generated byte array should be cached for 
reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not 
cached if bytes.length < length, and this will lead to performance overhead:

if (bytes == null) {
  if (length <= MAX_BYTES_LENGTH) {
    bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); 
  }
  else {
    bytes = new byte[length]; 
  }
} else if (bytes.length < length) {
  bytes = new byte[length]; 
}

 

2. To evaluate the offset, an integer is bitand with a mask to clear to low 
bits, and then shift right. The bitand is useless:

 

((index & BIT_BYTE_POSITION_MASK) >>> 3)

  was:
Improve the performance of class SegmentsUtil from two points:

In method allocateReuseBytes, the generated byte array should be cached for 
reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not 
cached if bytes.length < length, and this will lead to performance overhead:

if (bytes == null) {
  if (length <= MAX_BYTES_LENGTH) {
    bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); 
  }
  else {
    bytes = new byte[length]; 
  }
} else if (bytes.length < length) {
  bytes = new byte[length]; 
}

 

2. To evaluate the offset, an integer is bitand with a mask to clear to low 
bits, and then shift right. The bitand is useless:

 

((index & BIT_BYTE_POSITION_MASK) >>> 3)


> Improvement the performance of class SegmentsUtil
> -
>
> Key: FLINK-12335
> URL: https://issues.apache.org/jira/browse/FLINK-12335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>
> Improve the performance of class SegmentsUtil from two points:
> 1. In method allocateReuseBytes, the generated byte array should be cached 
> for reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array 
> is not cached if bytes.length < length, and this will lead to performance 
> overhead:
> if (bytes == null) {
>   if (length <= MAX_BYTES_LENGTH) {
>     bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); 
>   }
>   else {
>     bytes = new byte[length]; 
>   }
> } else if (bytes.length < length) {
>   bytes = new byte[length]; 
> }
>  
> 2. To evaluate the offset, an integer is bitand with a mask to clear to low 
> bits, and then shift right. The bitand is useless:
>  
> ((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12335) Improvement the performance of class SegmentsUtil

2019-04-26 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12335:


 Summary: Improvement the performance of class SegmentsUtil
 Key: FLINK-12335
 URL: https://issues.apache.org/jira/browse/FLINK-12335
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


Improve the performance of class SegmentsUtil from two points:

In method allocateReuseBytes, the generated byte array should be cached for 
reuse, if the size does not exceed MAX_BYTES_LENGTH. However, the array is not 
cached if bytes.length < length, and this will lead to performance overhead:

if (bytes == null) {
  if (length <= MAX_BYTES_LENGTH) {
    bytes = new byte[MAX_BYTES_LENGTH]; BYTES_LOCAL.set(bytes); 
  }
  else {
    bytes = new byte[length]; 
  }
} else if (bytes.length < length) {
  bytes = new byte[length]; 
}

 

2. To evaluate the offset, an integer is bitand with a mask to clear to low 
bits, and then shift right. The bitand is useless:

 

((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-26 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826677#comment-16826677
 ] 

Liya Fan commented on FLINK-10929:
--

Hi [Stephan 
Ewen|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=StephanEwen] 
and [Kurt 
Young|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=ykt836], 
thanks a lot for your valuable comments. 

It seems there is not much debate for point 1). 

For point 2), I want to list a few advantages of using Arrow as internal data 
structures for Flink. These conclusions are based on our investigations of 
research papers, as well as our engineering experiences in secondary 
development on Blink.
 # Arrow adopts columnar data format, which makes it possible to apply SIMD 
([link|[http://prestodb.rocks/code/simd/])|http://prestodb.rocks/code/simd/%5d).].
 SIMD means higher performance. Java starts to support SIMD in Java 8, and it 
is expected that high versions of Java will provide better support for SIMD. 
 # Arrow provides more compact memory layout. To make it simple, encoding a 
batch of rows as Arrow vectors will require much less memory space, compared 
with encoding by BinaryRow. This, in turn, leads to two results:
 ## With the same amount of memory space, more data can be loaded into memory, 
and fewer spills are required.
 ## The cost of shuffling can be reduced significantly. For example, the amount 
of shuffled data for TPC-H Q4 reduces to almost the half. 
 # Columnar format makes it much easier for data compression (see 
[paper|http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf], for example). 
Our initial experiments have shown some positive results.

There are more advantages for Arrow, but I just list a few, due to space 
constraints. The TPC-H performance in the attachment speaks for itself.

We understand and respect that Blink is currently being merged, and some 
changes should be postponed. We would like to provide our devoted help in this 
process.

However, too much change is not a good reason to refuse Arrow, if we want to 
make Flink the world-top-level computing engine. All these difficulties can be 
overcome. We have a plan to carry out the changes incrementally.

 

 

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-04-25 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826610#comment-16826610
 ] 

Liya Fan commented on FLINK-12319:
--

Hi [~mpf], thanks again for the additional information. 

It seems like a real bug of Flink. Would you please provide more information to 
reproduce the error? Can we reproduce the problem in local host? What is the 
implementation of class LoginEvent?

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> 

[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-04-25 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826606#comment-16826606
 ] 

Liya Fan commented on FLINK-12334:
--

Hi [~cre...@gmail.com], thanks for opening this issue. Would you please provide 
more details? Which test failed? 
AbstractStreamOperatorTestHarnessTest#testInitializeAfterOpenning seems to be 
working on my computer.

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12319) StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer

2019-04-24 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16825624#comment-16825624
 ] 

Liya Fan commented on FLINK-12319:
--

Hi [~mpf], thank you for opening this issue.

It seems like a bug related to the ending condition of the recursive calls.

However, the related code no longer exists in the latest code base, so it may 
not exist in the next release. 

> StackOverFlowError in cep.nfa.sharedbuffer.SharedBuffer
> ---
>
> Key: FLINK-12319
> URL: https://issues.apache.org/jira/browse/FLINK-12319
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Ubuntu 18.04
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
>Reporter: Marco Pfatschbacher
>Priority: Major
>
>  
> I wrote a simple SourceFunction that creats Events in a loop.
> The CEP pattern is very simple:
> {code:java}
>   final Pattern failurePattern =
>   Pattern.begin("5 or more failures", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("failed");
>   }
>   })
>   .times(5)
>   .next("1 or more successes")
>   .subtype(LoginEvent.class)
>   .where(
>   new IterativeCondition() {
>   @Override
>   public boolean filter(LoginEvent value, 
> Context ctx) throws Exception {
>   return 
> value.get("type").equals("success");
>   }
>   })
>   .times(1)
>   .within(Time.milliseconds(20));
> {code}
>  
> After about 100k Events, Flink aborts with this stacktrace:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>     at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.classdump.alerts.FlinkCep.brute_force_login(FlinkCep.java:263)
>     at org.classdump.alerts.FlinkCep.main(FlinkCep.java:41)
> Caused by: java.lang.StackOverflowError
>     at 
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:339)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.removeNode(SharedBuffer.java:355)
>     at 
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.releaseNode(SharedBuffer.java:342)
>     

[jira] [Commented] (FLINK-12115) Add support in Flink to persist state in Azure's blob store

2019-04-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823638#comment-16823638
 ] 

Liya Fan commented on FLINK-12115:
--

I vote for this issue, because it enhances the portability of Flink. Hope more 
people will pay attention to this issue.

> Add support in Flink to persist state in Azure's blob store
> ---
>
> Key: FLINK-12115
> URL: https://issues.apache.org/jira/browse/FLINK-12115
> Project: Flink
>  Issue Type: New Feature
>  Components: FileSystems
>Reporter: Piyush Narang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The existing set of flink-filesystems include filesystems like S3 / HDFS / 
> MapR etc. For folks using Azure, it would be nice to include support for 
> Azure's blob store as a filesystem as well. This would enable us to use Azure 
> blob store to store state / checkpoints for running Flink jobs. 
> Support for Azure's filesystem is part of the hadoop project in the 
> [hadoop-azure|https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-azure]
>  module. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12290) Fix the misleading exception message in SharedSlot class

2019-04-22 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12290:


 Summary: Fix the misleading exception message in SharedSlot class
 Key: FLINK-12290
 URL: https://issues.apache.org/jira/browse/FLINK-12290
 Project: Flink
  Issue Type: Bug
Reporter: Liya Fan
Assignee: Liya Fan


The exception message in SharedSlot.releaseSlot is misleading. It says 
"SharedSlot is not empty and released". But the condition should be "SharedSlot 
is not released and empty."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12289) Fix bugs and typos in Memory manager

2019-04-22 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12289:


 Summary: Fix bugs and typos in Memory manager
 Key: FLINK-12289
 URL: https://issues.apache.org/jira/browse/FLINK-12289
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan


According to the JavaDoc,  MemoryManager.release method should throw an NPE if 
the input argument is null. 

In addition, there are some typos in class MemoryManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment

2019-04-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823035#comment-16823035
 ] 

Liya Fan commented on FLINK-12259:
--

Hi [~gauravtiwari89], I see. Replied :) Thank you.

> Improve debuggability of method invocation failures in 
> OptimizerPlanEnvironment 
> 
>
> Key: FLINK-12259
> URL: https://issues.apache.org/jira/browse/FLINK-12259
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Gaurav
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In cases where method invocation fails without setting the `optimizerPlan`, 
> Flink does not always dump the stderr/stdout. Hence, logging from the method 
> is lost. The stacktrace alone is not always helpful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12279) Create source to allow streaming data from websocket.

2019-04-21 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822827#comment-16822827
 ] 

Liya Fan commented on FLINK-12279:
--

Hi [~Wosinsan], thanks for opening this isssue. Would you please give more 
details about this issue? This sounds like an interesting one.

> Create source to allow streaming data from websocket.
> -
>
> Key: FLINK-12279
> URL: https://issues.apache.org/jira/browse/FLINK-12279
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dominik Wosiński
>Priority: Major
>
> Currently, there exists an API that allows user to read data from regular 
> Java socket. I think we should also create an API that will allow reading and 
> streaming data from websockets too. Java does have the `javax.websocket-api` 
> that allows asynchronous reading from webockets and I think it could be used 
> for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment

2019-04-18 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821637#comment-16821637
 ] 

Liya Fan commented on FLINK-12259:
--

[~quan], thanks a lot for the comments. [~gauravtiwari89], I have sent a PR, 
please help take a look.

> Improve debuggability of method invocation failures in 
> OptimizerPlanEnvironment 
> 
>
> Key: FLINK-12259
> URL: https://issues.apache.org/jira/browse/FLINK-12259
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Gaurav
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where method invocation fails without setting the `optimizerPlan`, 
> Flink does not always dump the stderr/stdout. Hence, logging from the method 
> is lost. The stacktrace alone is not always helpful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12259) Improve debuggability of method invocation failures in OptimizerPlanEnvironment

2019-04-18 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-12259:


Assignee: Liya Fan

> Improve debuggability of method invocation failures in 
> OptimizerPlanEnvironment 
> 
>
> Key: FLINK-12259
> URL: https://issues.apache.org/jira/browse/FLINK-12259
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Gaurav
>Assignee: Liya Fan
>Priority: Minor
>
> In cases where method invocation fails without setting the `optimizerPlan`, 
> Flink does not always dump the stderr/stdout. Hence, logging from the method 
> is lost. The stacktrace alone is not always helpful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12246) can't get any option from the jobConfiguration of JobGraph

2019-04-18 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820763#comment-16820763
 ] 

Liya Fan commented on FLINK-12246:
--

Thanks for opening this issue. I do not see this is a problem. The 
configuration is empty initially. However, values can be inserted to it later. 

> can't get any option from the jobConfiguration of JobGraph
> --
>
> Key: FLINK-12246
> URL: https://issues.apache.org/jira/browse/FLINK-12246
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.6.3
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
> Fix For: 1.9.0
>
>
> the jobConfiguration was defined as a final variable. 
> {code:java}
> /** The job configuration attached to this job. */
> private final Configuration jobConfiguration = new Configuration();
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12223) HeapMemorySegment.getArray should return null after being freed

2019-04-17 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12223:
-
Description: According to the JavaDoc, HeapMemorySegment.getArray should 
return null after free is called, but it does not.   (was: According to the 
JavaDoc, HeapMemorySegment.getArray should return null, but it does not. )

> HeapMemorySegment.getArray should return null after being freed
> ---
>
> Key: FLINK-12223
> URL: https://issues.apache.org/jira/browse/FLINK-12223
> Project: Flink
>  Issue Type: Bug
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>
> According to the JavaDoc, HeapMemorySegment.getArray should return null after 
> free is called, but it does not. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12223) HeapMemorySegment.getArray should return null after being freed

2019-04-17 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12223:


 Summary: HeapMemorySegment.getArray should return null after being 
freed
 Key: FLINK-12223
 URL: https://issues.apache.org/jira/browse/FLINK-12223
 Project: Flink
  Issue Type: Bug
Reporter: Liya Fan
Assignee: Liya Fan


According to the JavaDoc, HeapMemorySegment.getArray should return null, but it 
does not. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12189) Fix bugs and typos in memory management related code

2019-04-16 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819706#comment-16819706
 ] 

Liya Fan commented on FLINK-12189:
--

To make the PR easier to understand to review, I have split the bugs into 
separate JIRAs, and provide UTs and PRs separately. 

> Fix bugs and typos in memory management related code
> 
>
> Key: FLINK-12189
> URL: https://issues.apache.org/jira/browse/FLINK-12189
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Operators
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are some bugs and typos in the memory related source code. For example,
>  # In MemoryManager.release method, it should throw an NPE, if the input is 
> null. But it does not. 
>  # In HybridMemorySegment.put and get methods, the number of bytes 
> read/written should be specified by the input parameter, rather than 
> reading/writing until the end.
>  # In HeapMemorySegment, the data member memory should be returned. This is 
> because the member heapArray will not be reset to null after calling the 
> release method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12216) Respect the number of bytes from input parameters in HybridMemorySegment

2019-04-16 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12216:


 Summary: Respect the number of bytes from input parameters in 
HybridMemorySegment
 Key: FLINK-12216
 URL: https://issues.apache.org/jira/browse/FLINK-12216
 Project: Flink
  Issue Type: Bug
Reporter: Liya Fan
Assignee: Liya Fan


For the following two methods in HybridMemorySegment class,

public final void get(int offset, ByteBuffer target, int numBytes)

public final void put(int offset, ByteBuffer source, int numBytes) 

the actual number of bytes read/written should be specified by the input 
parameter numBytes, but it does not for some types of ByteBuffer. Instead, it 
simply read/write until the end.

So this is a bug and I am going to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12210) Fix a bug in AbstractPagedInputView.readLine

2019-04-16 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12210:


 Summary: Fix a bug in AbstractPagedInputView.readLine
 Key: FLINK-12210
 URL: https://issues.apache.org/jira/browse/FLINK-12210
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan


In AbstractPagedInputView.readLine, character '\r' is removed in two places, 
which is redundant. In addition, only trailing '\r' should be removed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12189) Fix bugs and typos in memory management related code

2019-04-15 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12189:
-
Priority: Critical  (was: Major)

> Fix bugs and typos in memory management related code
> 
>
> Key: FLINK-12189
> URL: https://issues.apache.org/jira/browse/FLINK-12189
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Operators
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are some bugs and typos in the memory related source code. For example,
>  # In MemoryManager.release method, it should throw an NPE, if the input is 
> null. But it does not. 
>  # In HybridMemorySegment.put and get methods, the number of bytes 
> read/written should be specified by the input parameter, rather than 
> reading/writing until the end.
>  # In HeapMemorySegment, the data member memory should be returned. This is 
> because the member heapArray will not be reset to null after calling the 
> release method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12204) Improve JDBCOutputFormat ClassCastException

2019-04-15 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818567#comment-16818567
 ] 

Liya Fan commented on FLINK-12204:
--

[~hequn8128], thanks for your fast response. Please go ahead and open your PR :)

> Improve JDBCOutputFormat ClassCastException
> ---
>
> Key: FLINK-12204
> URL: https://issues.apache.org/jira/browse/FLINK-12204
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / JDBC
>Affects Versions: 1.8.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.9.0, 1.8.1
>
>
> ClassCastExceptions thrown by JDBCOutputFormat are not very helpful because 
> they do not provide information for which input field the cast failed.
> We should catch the exception and enrich it with information about the 
> affected field to make it more useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12204) Improve JDBCOutputFormat ClassCastException

2019-04-15 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818499#comment-16818499
 ] 

Liya Fan commented on FLINK-12204:
--

I vote for this issue, because it helps debugging.

If no one else is looking at it, I would like to help resolve it.

> Improve JDBCOutputFormat ClassCastException
> ---
>
> Key: FLINK-12204
> URL: https://issues.apache.org/jira/browse/FLINK-12204
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / JDBC
>Affects Versions: 1.8.0
>Reporter: Fabian Hueske
>Priority: Major
> Fix For: 1.9.0, 1.8.1
>
>
> ClassCastExceptions thrown by JDBCOutputFormat are not very helpful because 
> they do not provide information for which input field the cast failed.
> We should catch the exception and enrich it with information about the 
> affected field to make it more useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12187) wrap FileWriter with BufferedWriter for better performance

2019-04-15 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817620#comment-16817620
 ] 

Liya Fan commented on FLINK-12187:
--

[~bd2019us], thanks for opening this issue. Could you please give some 
references to show that the performance of BufferedWriter is better?

> wrap FileWriter with BufferedWriter for better performance
> --
>
> Key: FLINK-12187
> URL: https://issues.apache.org/jira/browse/FLINK-12187
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Reporter: bd2019us
>Priority: Major
>  Labels: patch-available, pull-requests-available
> Attachments: 1.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Location: 
> src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
> The FileWriter.write() method is invoked multiple times in loops, which is 
> bad to the performance of program. The decorator class BufferedWriter can be 
> used to alleviate the impact of frequent IO operation with caches. Therefore, 
> when the write() method is intensively used, the BufferedWriter is highly 
> recommended and should be preferred.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12189) Fix bugs and typos in memory management related code

2019-04-15 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12189:
-
Summary: Fix bugs and typos in memory management related code  (was: Fix 
bugs and typos in memory management related classes)

> Fix bugs and typos in memory management related code
> 
>
> Key: FLINK-12189
> URL: https://issues.apache.org/jira/browse/FLINK-12189
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Operators
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Major
>
> There are some bugs and typos in the memory related source code. For example,
>  # In MemoryManager.release method, it should throw an NPE, if the input is 
> null. But it does not. 
>  # In HybridMemorySegment.put and get methods, the number of bytes 
> read/written should be specified by the input parameter, rather than 
> reading/writing until the end.
>  # In HeapMemorySegment, the data member memory should be returned. This is 
> because the member heapArray will not be reset to null after calling the 
> release method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12189) Fix bugs and typos in memory management related classes

2019-04-15 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12189:


 Summary: Fix bugs and typos in memory management related classes
 Key: FLINK-12189
 URL: https://issues.apache.org/jira/browse/FLINK-12189
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan


There are some bugs and typos in the memory related source code. For example,
 # In MemoryManager.release method, it should throw an NPE, if the input is 
null. But it does not. 
 # In HybridMemorySegment.put and get methods, the number of bytes read/written 
should be specified by the input parameter, rather than reading/writing until 
the end.
 # In HeapMemorySegment, the data member memory should be returned. This is 
because the member heapArray will not be reset to null after calling the 
release method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12162) Build error in flink-table-planner

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815361#comment-16815361
 ] 

Liya Fan commented on FLINK-12162:
--

Hi [~dawidwys], thanks for your comments. This problem is triggered only for 
low versions of Java 8, as you indicated.

Hope none of our customers are still using those low versions of JDK. 

> Build error in flink-table-planner
> --
>
> Key: FLINK-12162
> URL: https://issues.apache.org/jira/browse/FLINK-12162
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There is a build error in project flink-table-planner:
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-table-planner_2.11: Compilation failure
>  [ERROR] 
> .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
>  unreported exception X; must be caught or declared to be thrown
> I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815264#comment-16815264
 ] 

Liya Fan commented on FLINK-10929:
--

Hi [~fhueske], thank you for your kind reply. I agree with most points. The 
changes should be incremental so as not to break other components. So I think 
the first step is to provide a flag which is disabled by default, and let the 
MemoryManager depend on the Arrow Buffer Allocator. With this change, all the 
MemorySegment will be Arrow buffers, but this is transparent to other 
components, and will break them.

I guess I will initiate a discussion on the dev mailing list.

BTW, my mail address has been kicked out from the mailing list for a couple of 
days, because the dev-help claims that it receives some bouncing message. Would 
you please add another email address of mine (fan_li...@aliyun.com) to the 
mailing list?  Thank you in advance.

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12162) Build error in flink-table-planner

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815229#comment-16815229
 ] 

Liya Fan commented on FLINK-12162:
--

Hi [~dawidwys], thanks a lot for your kind reminder. I checked the branch, and 
it is master. I synced the code from apache.flink more than one hour ago. 
Unfortunately, this problem can be repeated with certainty.

> Build error in flink-table-planner
> --
>
> Key: FLINK-12162
> URL: https://issues.apache.org/jira/browse/FLINK-12162
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
>
> There is a build error in project flink-table-planner:
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-table-planner_2.11: Compilation failure
>  [ERROR] 
> .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
>  unreported exception X; must be caught or declared to be thrown
> I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12162) Build error in flink-table-planner

2019-04-11 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-12162:
-
Description: 
There is a build error in project flink-table-planner:

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on project flink-table-planner_2.11: Compilation failure
 [ERROR] 
.../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
 unreported exception X; must be caught or declared to be thrown

I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327

  was:
There is a build error in project flink-table-planner:

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on project flink-table-planner_2.11: Compilation failure
[ERROR] 
.../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
 unreported exception X; must be caught or declared to be thrown

I am using JDK 1.8.0_45, maven 3.5.2


> Build error in flink-table-planner
> --
>
> Key: FLINK-12162
> URL: https://issues.apache.org/jira/browse/FLINK-12162
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Critical
>
> There is a build error in project flink-table-planner:
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-table-planner_2.11: Compilation failure
>  [ERROR] 
> .../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
>  unreported exception X; must be caught or declared to be thrown
> I am using JDK 1.8.0_45, maven 3.5.2, my OS is Linux with kernel 3.10.0-327



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12162) Build error in flink-table-planner

2019-04-11 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12162:


 Summary: Build error in flink-table-planner
 Key: FLINK-12162
 URL: https://issues.apache.org/jira/browse/FLINK-12162
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


There is a build error in project flink-table-planner:

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on project flink-table-planner_2.11: Compilation failure
[ERROR] 
.../flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java:[85,54]
 unreported exception X; must be caught or declared to be thrown

I am using JDK 1.8.0_45, maven 3.5.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10929) Add support for Apache Arrow

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815155#comment-16815155
 ] 

Liya Fan edited comment on FLINK-10929 at 4/11/19 7:45 AM:
---

Hi [~yanghua], thank you so much for your feedback. Below is an initial draft 
of our design document:

[https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing]

Please give your valuable comments.


was (Author: fan_li_ya):
Hi [~yanghua], thank you so much for your feedback. Below is an initial draft 
of our design document:

https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing
 

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815155#comment-16815155
 ] 

Liya Fan commented on FLINK-10929:
--

Hi [~yanghua], thank you so much for your feedback. Below is an initial draft 
of our design document:

https://docs.google.com/document/d/1LstaiGzlzTdGUmyG_-9GSWleKpLRid8SJWXQCTqcQB4/edit?usp=sharing
 

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-11 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815127#comment-16815127
 ] 

Liya Fan commented on FLINK-11799:
--

Hi [~longtimer], thanks a lot for your feedback. I have prepared a PR, please 
take a look and give your valuable comments.

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Assignee: Liya Fan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815009#comment-16815009
 ] 

Liya Fan commented on FLINK-10929:
--

We have imported Arrow in our efforts to vectorize Blink batch jobs. It is an 
incremental change, which can be easily turned off with a single flag, and it 
does not affect other parts of the code base.

[~fhueske], do you think it is a good time to make some initial attempts to 
incorporate such changes now ? :)

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-10 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan reassigned FLINK-11799:


Assignee: Liya Fan

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Assignee: Liya Fan
>Priority: Major
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814396#comment-16814396
 ] 

Liya Fan commented on FLINK-3685:
-

Hi [~bowen.zheng], thank you for opening this issue. The NPE indicates bugs in 
the system. However, I do not think it is DateSerializer's responsibility, 
because it is the caller's responsibility to create a reuse object, and make 
sure it is not null. The bug should be elsewhere.

Can you please provide the whole source code to reproduce the problem?

BTW, the latest code no longer uses -1 as an indicator for null. The latest 
code looks like this:

@Override
 public Date deserialize(Date reuse, DataInputView source) throws IOException {
 final long v = source.readLong();
 if (v == Long.MIN_VALUE) {
 return null;
 }
 reuse.setTime(v);
 return reuse;
 }

> Logical error in code for DateSerializer deserialize with reuse
> ---
>
> Key: FLINK-3685
> URL: https://issues.apache.org/jira/browse/FLINK-3685
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: ZhengBowen
>Priority: Major
>
> There is a logical error in the following function in DateSerializer.java 
> when source read '-1'
> function is:
> {code}
> public Date deserialize(Date reuse, DataInputView source) throws IOException {
>   long v = source.readLong();
>   if(v == -1L) {
>   return null;
>   }
>   reuse.setTime(v);
>   return reuse;
> }
> {code}
> when call this function for first time, if return null, then 'reuse' will be 
> set null by caller;
> when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will 
> throw NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814248#comment-16814248
 ] 

Liya Fan commented on FLINK-11799:
--

Hi Jason, thanks for opening this issue. For problem 1), I don't think it is a 
big problem, although I think some warning should be generated at least. This 
is because it only swallows exception of type KryoException, and for this type 
of exception, it handles it by reading and serializing the raw data directly. 
This should be fine, for data that cannot be copied.

For problem 2), I think it is a good point, because it can be asserted at this 
point in the code that the return value should never be null. However, I think 
this check should be made by the copy or readObject method. 

If you think it is OK, I can take care of this issue :)

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Priority: Major
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12153) 提交flink job到flink环境下报错

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814230#comment-16814230
 ] 

Liya Fan commented on FLINK-12153:
--

According to the error message, your hadoop is outdated. 

Please try again with a new version of hadoop.

> 提交flink job到flink环境下报错
> --
>
> Key: FLINK-12153
> URL: https://issues.apache.org/jira/browse/FLINK-12153
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.2
> Environment: flink maven
> 
>  org.apache.flink
>  flink-streaming-java_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-kafka-0.11_2.12
>  1.7.1
> 
> 
>  org.apache.hadoop
>  hadoop-hdfs
>  2.7.2
>  
>  
>  xml-apis
>  xml-apis
>  
>  
> 
> 
>  org.apache.hadoop
>  hadoop-common
>  2.7.2
> 
> 
> 
>  org.apache.flink
>  flink-hadoop-compatibility_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-filesystem_2.12
>  1.7.1
> 
> 
> 
>  org.apache.flink
>  flink-connector-elasticsearch5_2.12
>  1.7.1
> 
>  
>  
> hadoop 环境版本 2.7.7
>  
>Reporter: gaojunjie
>Priority: Major
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are 
> only supported for HDFS and for Hadoop version 2.7 or newer
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10130) How to define two hdfs name-node IPs in flink-conf.yaml file

2019-04-10 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814097#comment-16814097
 ] 

Liya Fan commented on FLINK-10130:
--

[~Paul Lin] is right. HDFS name service solves the problem. So this issue can 
be closed.

> How to define two hdfs name-node IPs in flink-conf.yaml file
> 
>
> Key: FLINK-10130
> URL: https://issues.apache.org/jira/browse/FLINK-10130
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Keshav Lodhi
>Priority: Major
> Attachments: docker-entrypoints.sh
>
>
> Hi Team,
> Here is, what we are looking for:
>  * We have  flink HA dockerized cluster with (3 zookeepers, 2 job-managers, 3 
> task-managers).
>  * We are using HDFS from the flink to store some data. The problem we are 
> facing is that, we are not able to pass 2 name-node IPs in config. 
>  * These are the config parameters we want to add two name-node IPs
>  #       "state.checkpoints.dir: hdfs://X.X.X.X:9001/flinkdatastorage"
>  #       "state.backend.fs.checkpointdir: 
> hdfs://X.X.X.X:9001/flinkdatastorage"
>  #       "high-availability.zookeeper.storageDir: 
> hdfs://X.X.X.X:9001/flink/recovery"
> Currently we are passing only one name-node IP
> Please advise. 
> I have attached the sample *docker-entrypoint.sh* file: 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10391) MillisOfDay is used in place of instant for LocalTime ctor in AvroKryoSerializerUtils

2019-04-09 Thread Liya Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liya Fan updated FLINK-10391:
-
Attachment: image-2019-04-10-13-54-13-788.png

> MillisOfDay is used in place of instant for LocalTime ctor in 
> AvroKryoSerializerUtils
> -
>
> Key: FLINK-10391
> URL: https://issues.apache.org/jira/browse/FLINK-10391
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: Ted Yu
>Priority: Minor
> Attachments: image-2019-04-10-13-54-13-788.png
>
>
> From the JodaLocalTimeSerializer#write, we serialize getMillisOfDay() value 
> from LocalTime.
> For read method:
> {code}
>   final int time = input.readInt(true);
>   return new LocalTime(time, 
> ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
> {code}
> It seems 
> http://joda-time.sourceforge.net/apidocs/org/joda/time/LocalTime.html#fromMillisOfDay(long,%20org.joda.time.Chronology)
>  should be used instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10391) MillisOfDay is used in place of instant for LocalTime ctor in AvroKryoSerializerUtils

2019-04-09 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814075#comment-16814075
 ] 

Liya Fan commented on FLINK-10391:
--

The current implementation already uses 
org.joda.time.LocalTime.fromMillisOfDay. So this issue can be closed.

!image-2019-04-10-13-54-13-788.png!

> MillisOfDay is used in place of instant for LocalTime ctor in 
> AvroKryoSerializerUtils
> -
>
> Key: FLINK-10391
> URL: https://issues.apache.org/jira/browse/FLINK-10391
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: Ted Yu
>Priority: Minor
> Attachments: image-2019-04-10-13-54-13-788.png
>
>
> From the JodaLocalTimeSerializer#write, we serialize getMillisOfDay() value 
> from LocalTime.
> For read method:
> {code}
>   final int time = input.readInt(true);
>   return new LocalTime(time, 
> ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
> {code}
> It seems 
> http://joda-time.sourceforge.net/apidocs/org/joda/time/LocalTime.html#fromMillisOfDay(long,%20org.joda.time.Chronology)
>  should be used instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >