[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-15 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
@aljoscha yes, i review the code this day, it will jude each window whether 
late , so the previous method i use will counts more lost data  than the actual 
situation , i have fix the error and re-push, please help me review the code 
again, thanks.


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-14 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
@aljoscha i agree that the name set to be "numLateElementsDropped", and do 
you mean that my result should minus the num of element that go to side output 
which is skipped and lateElement?


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-19 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
Hello,Is it can be merged in @aljoscha @zentol ?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-23 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140634868
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

it looks better this way, i will adjust here  :)


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-23 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140635202
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -231,6 +231,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

i think it should write in  this way  

```
  if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.lostDataCount.inc();
}
}
```


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138789377
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

@zentol if i use `protected final Counter lostDataCount = new 
SimpleCounter()` then i run into `Caused by: java.io.NotSerializableException: 
org.apache.flink.metrics.SimpleCounter` because `SimpleCounter` is not 
Serializable, so i think i should use the old way that `
this.lostDataCount = metrics.counter(LATE_ELEMENTS_METRIC_NAME);` in open 
method 


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138778844
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

 @zentol  i have adjust my code according the comment, and add the doc 
about this metrics, please review again, thanks


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-19 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139742368
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

i think when the `isSkippedElement` is true,  the `isElementLate(element)` 
is always be true. Because `isSkippedElement` is true when all the assigned 
window's window.endtime + allowLateness < currentLowWatermark, and 
`isElementLate` is true when element.time + allowLateness < 
currentLowWatermark. and element.time is <= bigest window.endtime.  so does 
`isElementLate` always be true when isSkippedElement is true?  And i think if i 
want to rule  out the situation that **because no windows were assigned to 
it.**, i just need to judge  whether the variable `Collection elementWindows` 
is empty?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-20 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139933981
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

haha, i know your mean now, i think its interesting ^ _ ^.  i push the code 
again , and modify the metrics name according to @zentol , please review the 
code again @aljoscha 


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4665

[Flink-7611]add metrics to measure the num of data dropped due to the data 
arrived late

## What is the purpose of the change

1. add metrics to measure the num of data dropped due to the data arrived 
late,this is meanningful when to guide the user to set the suitable 
allowLatency or MaxOutOfOrder time


## Brief change log

  -  register counter metrics in windowOperator#open()
  - invoke inc() method, when judged the isWindowLate()

## Verifying this change

This change is already covered by existing tests by `mvn clean verify`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7611

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4665.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4665


commit 35b684ce1f3b72018ced26af07808390dff68547
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-09-12T04:26:08Z

add metrics to measure the data dropped due to arrive late

commit aabdc224cb62b29975834e11c1374d182c4d4d01
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-09-12T05:58:07Z

adjust format




---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138515752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

but i think the OperatorIOMetricGroup is all about IO Metrics about 
Operator, is it suitable?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138515541
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

Do you mean that i should add this in OperatorIOMetricGroup?


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-28 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
is there still anything wrong @zentol 


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-25 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
is this ok to be merged in @bowenli86  @zentol @aljoscha ? 😄 


---


[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-27 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
ping @tzulitai ~


---


[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-30 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
Hi, @tzulitai 
After i read kafkaConsumer code again, i found that the per partition kafka 
lag metric is register in method `FetchManagerMetrics#recordPartitionLag` But 
the when the client get the num equal to `max.poll.records ` at  once poll, it 
will  return the record it polls in advance left some partition haven't not 
been `sendFetches` to. So some partition will be lost. In test , if we just 
poll once , then  register kafka metric , if i have many partition like 
about(100), some partition lag metric will be losed.  
So i think, with a configurable property, users can choose to when they 
have too many partition, and will do little harmless to the performance .

Please let me know your idea ,thanks


---


[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...

2017-11-03 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4935#discussion_r148738495
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records = 
consumer.poll(pollTimeout);
+   // register Kafka's very own 
metrics in Flink's metric reporters
+   if (useMetrics && 
!records.isEmpty()) {
+   // register Kafka 
metrics to Flink
+   Map<MetricName, ? 
extends Metric> metrics = consumer.metrics();
+   if (metrics == null) {
+   // MapR's Kafka 
implementation returns null here.
+   
log.info("Consumer implementation does not support metrics");
+   } else {
+   // we have 
Kafka metrics, register them
+   for 
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
--- End diff --

i have realized the register several times and then skip the register, and 
then can successfully register the related metrics. Please let me know if you 
have any suggestion, thanks~   and the commits have been squash :)


---


[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...

2017-11-03 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4935#discussion_r148738023
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records = 
consumer.poll(pollTimeout);
+   // register Kafka's very own 
metrics in Flink's metric reporters
+   if (useMetrics && 
!records.isEmpty()) {
+   // register Kafka 
metrics to Flink
+   Map<MetricName, ? 
extends Metric> metrics = consumer.metrics();
+   if (metrics == null) {
+   // MapR's Kafka 
implementation returns null here.
+   
log.info("Consumer implementation does not support metrics");
--- End diff --

i change the level to debug.


---


[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...

2017-11-01 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4935

[Flink-7945][Metrics]Fix per partition-lag metric lost in kafka 
connector 

## What is the purpose of the change

*When used KafkaConnector, we cant get per partition lag metric. But it has 
been exposed after kafka 0.10.2 
[https://issues.apache.org/jira/browse/KAFKA-4381](url). After read the kafka 
code, i found that the per partition lag is register after `KafkaConsumer#poll` 
method be invoked, so i change the metric register time in flink , and after 
this, with kafka-connector10 and kafka-connector11 we can see the correct lag 
metric. *

## Brief change log

  - *Change the kafka metric register time in Flink kafka-connector*


## Verifying this change

This change is already run through the test case

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7945

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4935.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4935


commit 4f0e405fd0e697e67a0d4dc301d85244fc031086
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T12:51:46Z

change the way to get metric in kafkaConsumerThread

commit 183eea766ab6302c4f0813b2372f95a299ead67d
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T14:44:19Z

overrride the createCallBridge method in kafkaFetcher10

commit d109efe7e2290eafdedf21fa7fbb4b8ac2d1bb58
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T15:11:41Z

remove unused import

commit 7dd26b6ddfe0f16ac57d9810dc46ae6b9fb34d18
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T15:13:34Z

checkstyle

commit 61db98e0469d85755d6cea560e110f61b6135739
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T15:29:47Z

add debug log

commit b55ab47b819dec90b18b8d57df5978aae0496e11
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T15:41:00Z

remove log

commit 64ae04f0846b6fcdc851e98a1df71e486bdf7762
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-31T15:43:44Z

checkstyle

commit bc16ae2ff89e63f71a050483bffb6d8a4389acd0
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T08:37:03Z

change the location of register kafka metrics to flink

commit 6fdf8e082669bd69fb730c32c5755660c59d2ab3
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T08:50:12Z

checkstyle

commit df2620926077c307510baaf74f0d10bf34fe6a1c
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T09:19:57Z

use specific version poll method

commit c7f44b99911665c974706c6025f69aa097657494
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T09:32:00Z

method signature

commit b41be18914c0ad8800f6faa30f1fcb0b995e40c0
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T13:52:39Z

remove callbridge invoke

commit c0dea5068cbb04763265b8f7dc6d80fc4b7cff49
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T14:29:31Z

just for test

commit e3df3a0705329d4e19f03a18b412e03664a62c9c
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T15:06:44Z

judge poll success

commit 3dbfa26ee6b46e6a1a6d708dd5bb759ff86014c8
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T15:47:07Z

judge ConsumerRecords not empty

commit 7f1f653e6346f0e09cf0582d312ae10d223ba92a
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T15:54:09Z

checkstyle

commit 7828945af3e560e782ee12f0cd11018d3f4e8dbf
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T16:11:14Z

add flag to judge whether kafka has been registered

commit 3dbd601ae20d1c5163a01e20b991b175f1180aff
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T16:15:24Z

doc format

commit f9b8fd4e2c9fc488456b141158d239ce2386a854
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-11-01T22:35:16Z

add metrics exist judge and remove unsed code

commit c14feacbe7db945f313de4a39dde13ecc1825924
Author: minwenjun <minwen...@

[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-01 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
cc @zentol @tzulitai  please help review the code.


---


[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-07 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
Hi @tzulitai , could you take look at this again :-) ?


---


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4960
  
🎉 


---


[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...

2017-11-02 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4935#discussion_r148530767
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records = 
consumer.poll(pollTimeout);
+   // register Kafka's very own 
metrics in Flink's metric reporters
+   if (useMetrics && 
!records.isEmpty()) {
+   // register Kafka 
metrics to Flink
+   Map<MetricName, ? 
extends Metric> metrics = consumer.metrics();
+   if (metrics == null) {
+   // MapR's Kafka 
implementation returns null here.
+   
log.info("Consumer implementation does not support metrics");
+   } else {
+   // we have 
Kafka metrics, register them
+   for 
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
--- End diff --

yes, i agree with you this is not the best way to solve. what do you think 
about try to register  kafka metrics at the beginnng of the job for about 
serval times which can be  configured by `properties`, after beyond the count, 
we will not run in the loop~


---


[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-03 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
update the code according to the comment. ping @tzulitai 


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-10-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
Hi @zentol , can you help merge this pr ? Is this be forgetten 


---


[GitHub] flink issue #4878: [FLINK-7895][hotfix][docs]Fix error in example in get lat...

2017-10-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4878
  
Yes, i will close this one ~


---


[GitHub] flink pull request #4878: [FLINK-7895][hotfix][docs]Fix error in example in ...

2017-10-24 Thread Aitozi
Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/4878


---


[GitHub] flink pull request #4878: [FLINK-7895][hotfix][docs]Fix error in example in ...

2017-10-20 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4878

[FLINK-7895][hotfix][docs]Fix error in example in get late message in 
window doc

* getSideOutput api is only available in SingleOutputOperator class, and is 
not the part of the base class





You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7895

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4878.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4878


commit ce09fb63138df9fe00af1281e8ee41cc0ad5bb45
Author: minwenjun <minwen...@didichuxing.com>
Date:   2017-10-20T20:11:58Z

fix error in example in get late message in window doc




---


[GitHub] flink pull request #6080: [Flink-9443]Remove unused parameter in generateNod...

2018-05-26 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6080

[Flink-9443]Remove unused parameter in generateNodeLocalHash

After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method 
generateNodeLocalHash dont use the information like (parallel, userFunction) 
now, so the parameter should be removed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-9443

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6080.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6080


commit c174cb29667e27abfb4fd74b0d5bca894f9da2d9
Author: minwenjun <minwenjun@...>
Date:   2018-05-26T10:27:07Z

Remove unused parameter in generateNodeLocalHash




---


[GitHub] flink issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern i...

2018-06-08 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6124
  
Got it. Thanks for your response @dawidwys and waiting for you further 
comment on this PR.  I am glad to contribute to the cep library and really hope 
to get more ideas or share from you guys discussion about CEP, thanks.


---


[GitHub] flink issue #6171: [FLINK-9593] Unified After Match semantics with SQL MATCH...

2018-06-15 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6171
  
Hi, @dawidwys can you explain a little about how does the semantics of 
`AfterMatch` differ from previous implementation, I read the doc and feel a 
lille confused. thx ;-)


---


[GitHub] flink pull request #6168: [FLINK-9588]Reused context with same computation s...

2018-06-14 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6168

[FLINK-9588]Reused context with same computation state calculate

## What is the purpose of the change

Now cep checkFilterCondition with a newly created Conditioncontext for each 
edge, which will result in the repeatable getEventsForPattern due to the init 
of `shouldUpdate`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink context-reuse

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6168


commit ed71ac4407de9d8163efa8c334d9ac0e63e47069
Author: minwenjun 
Date:   2018-06-14T14:24:02Z

[FLINK-9588]Reused context with same computation state calculate




---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195737755
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

I think it needs the `conditionContext` and `computationState` and should 
replace the `sharedBuffer` with `conditionContext`.


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195738359
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

agree


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195745115
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

I think the sharedbuffer has been changed during the `TAKE` branch, the 
conditionContext should be different.


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195792448
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

I think over it again, the content of the `sharedBuffer` make difference  
to  the result of the `getEventsForPattern`, so the result should be update 
with the change of the `sharedBuffer`. But i think we only have to reset the 
`shouldUpdate` flag to `true` here rather than create a  context again, right? 
@dawidwys 


---


[GitHub] flink issue #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQueueSta...

2018-06-14 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6162
  
Thanks for your review , remove the same block in `onProcessingTime`.


---


[GitHub] flink pull request #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQ...

2018-06-14 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6162

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState

## What is the purpose of the change

Remove unneeded clear on elementQueueState, when soretedTimestamp is 
empty, the elements in elementQueueState are all removed, so don't need to 
clear again to waste time on RocksDB operation.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink remove-clear

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6162


commit 62a1a506cf8dab263a247d81fa7092eaa0743624
Author: minwenjun 
Date:   2018-06-14T06:20:42Z

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState




---


[GitHub] flink issue #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQueueSta...

2018-06-15 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6162
  
Is this OK? @dawidwys 


---


[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

2018-06-15 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6168
  
please help review this pr @dawidwys , thx.


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6104

[FLINK-9476]Emit late elements in CEP as sideOutPut

Now, when use with Eventtime in CEP library, elements come later than 
watermark will be dropped,we can put it in side Output with outPutTag

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-9476

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6104


commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4
Author: minwenjun 
Date:   2018-05-30T15:32:15Z

Add loseDataOutputTag in cep deal with event time dropped data

commit 373e376fc182c32fe69765aa564e93057954ff44
Author: minwenjun 
Date:   2018-05-30T15:50:01Z

add scala api




---


[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut

2018-05-30 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6104
  
@bowenli86 thanks for review, i have fix the error according to the comment 
and add the unit test in CEPITCase, please help review it again. cc @kl0u 


---


[GitHub] flink issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalH...

2018-05-29 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6080
  
 @aljoscha  please help review this ,thanks


---


[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...

2018-06-03 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6109#discussion_r192628465
  
--- Diff: docs/start/building.md ---
@@ -50,7 +50,11 @@ mvn clean install -DskipTests
 
 This instructs [Maven](http://maven.apache.org) (`mvn`) to first remove 
all existing builds (`clean`) and then create a new Flink binary (`install`).
 
-To speed up the build you can skip tests, checkstyle, and JavaDocs: `mvn 
clean install -DskipTests -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true`.
+To speed up the build you can skip tests, checkstyle, and JavaDocs:
+
+{% highlight bash %}
+mvn clean install -DskipTests -Dmaven.javadoc.skip=true 
-Dcheckstyle.skip=true
--- End diff --

A little question, what is the meaning of the `-Dfast` ?


---


[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-04 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6059
  
@dawidwys I'm sorry, I don't have a systematic testing tool. I'm working on 
the inner dynamic cep and `CEPOperator` can process serveral `NFA` when receive 
an element. Then i encounter the backpressure problem, i just test the patch if 
can overcome the backpressure problem. 
with this patch apply, 200 parallel CEPOperator can handle about 7000qps 
data input with about 30 patterns(rule) without backpressue. The type of 
pattern may have different influence, so the data I provide here may not be so 
useful. 


---


[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut

2018-06-04 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6104
  
thx, can help merge this @dawidwys  


---


[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut

2018-06-04 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6104
  
@dawidwys fix the flaw according to your suggestion ;-)


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-02 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r192558813
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventWrapper.java
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Thin wrapper around user event that adds a lock.
+ *
+ * @param  user event type
+ */
+public class EventWrapper {
--- End diff --

Why the this class use the same name with the inner class in NFA.java? Is 
this intend to?  It is a little confused


---


[GitHub] flink pull request #6111: [FLINK-9504]Change the log level of checkpoint dur...

2018-06-02 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6111

[FLINK-9504]Change the log level of checkpoint duration to debug

Now, every time checkpoint it will log with the OperatorStateBackend and 
KeyedStateBackend with per partition/parallel time cost, it often lead to too 
much log in TaskManager , i think the log level should change to the debug, is 
it ok ? @StephanEwen  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-9504

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6111


commit 8368e2e82ad6e0605bfbc4692cde3cc09e431816
Author: minwenjun 
Date:   2018-06-02T13:05:04Z

change the log level of checkpoint cost to debug




---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-06-01 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r192428178
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
@@ -54,6 +54,12 @@
// comparator to sort events
private final EventComparator comparator;
 
+   /**
+* Side output {@code OutputTag} for late data. If no tag is set late 
data will simply be
--- End diff --

fixed


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-06-01 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r192428106
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lateDataOutputTag = new 
OutputTag("late-data""){};
+
+OutputTag outputTag = new OutputTag("side-output""){};
--- End diff --

removed


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-06-01 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r192428038
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lateDataOutputTag = new 
OutputTag("late-data""){};
+
+OutputTag outputTag = new OutputTag("side-output""){};
+
+SingleOutputStreamOperator result = patternStream
+.sideOutputLateData(lateDataOutputTag)
+.select(
+new PatternTimeoutFunction() {...},
+outputTag,
+new PatternSelectFunction() {...}
+);
+
+DataStream lateData = result.getSideOutput(lateDataOutputTag);
+
+
+{% endhighlight %}
+
+
+
+
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+val lateDataOutputTag = OutputTag[String]("late-data")
+
+val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
+.sideOutputLateData(lateDataOutputTag)
+.select(outputTag){
--- End diff --

OK, I remove the timeout tag to reduce confusion, and fix a little error in 
before doc(the sequence of the parameter is wrong)


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-06-01 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r192428502
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -383,6 +386,100 @@ public String select(Map> 
pattern) {
env.execute();
}
 
+   @Test
+   public void testSimpleKeyedPatternEventTimeWithSideOutput() throws 
Exception {
--- End diff --

move the test case to CepOperatorTest, please help review again, thanks ;)


---


[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-02 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6059
  
After go through the whole change, i think it is extremely a good feature 
👍, and i will apply this patch to our inner library to test how much the 
performance improved.
A little question: why the Travis run failed, i have checked the _job run 
failed log_, but can't find the reason.


---


[GitHub] flink pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pa...

2018-06-05 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6124

[FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the 
pattern

## What is the purpose of the change

As described in the jira 
[FLINK-8914](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-8914)
There is something wrong with `greedy` when it is the head of the pattern. 
Because the `NFA` process each `ComputationState` and will produce a new `Start 
ComputationState`. So when it runs into the greedy match, other `start runs` 
can also be set up

## Brief change log

*(for example:)*
  - *Add a new StateType `Greedy` for convenience of distinguishing the 
greedy in computations*
  - *Remove the redundant start state during process*


## Verifying this change

Add two UT in `GreedyITCase`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink greedyfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6124


commit 76c1a4516b4bc98043d944335cc7a0aacd359278
Author: minwenjun 
Date:   2018-06-05T16:07:55Z

Fix wrong semantic when greedy pattern is the head of the pattern




---


[GitHub] flink issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern i...

2018-06-05 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6124
  
@dawidwys please help review this pr when you free,thx


---


[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...

2018-06-04 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6111
  
thanks @StephanEwen for review. As you mentioned, I only set the log level 
in state backends to `debug` , is there something wrong with my pull request or 
misunderstand what your meaning ?


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Using the `entries#putAll` in `flushCache` lead to the count in 
`NFAStateAccessTest` increased,  I will check it locally , this travis will 
fail.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198151182
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Get it.


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
fixed the error that the access to state increased in `NFAStateAccessTest` 
by add the `isEmpty` judgment before update the state.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-23 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6205

[FLINK-9642]Reduce the count to deal with state during a CEP process

## What is the purpose of the change

With the rework of sharedBuffer Flink-9418, the lock & release operation is 
deal with state backend which is different from the previous version which will 
read the state of sharedBuffer all to memory, i think we can add a cache or 
variable in sharedbuffer to cache the LockAble Object to mark the ref change in 
once process in NFA, this will reduce the count when the events point to the 
same Node. And flush the result to MapState at the end of process. 


## Brief change log

- add the eventsBufferCache and entryCache
- flush the cache after one turn process 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink onceQueryCache

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6205


commit 184ec24474ee5b8a1c9f932286ef4aed4f1dabd6
Author: minwenjun 
Date:   2018-06-23T12:56:55Z

[FLINK-9642]Reduce the count to deal with state during a CEP process




---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @zhangminglei ,  thanks for your review. I only check the 
SharedBufferTest locally before, the error in travis means the num of state 
access (read and write) is less than before which is the purpose of this pr, 
and I fix the error. cc @dawidwys 


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-24 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629310
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
--- End diff --

fixed


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-24 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629218
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -75,6 +76,9 @@
private MapState eventsCount;
private MapState> entries;
 
+   private HashMap> eventsBufferCache = new 
HashMap<>();
+   private HashMap> entryCache = new 
HashMap<>();
--- End diff --

agree and fixed


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-23 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629205
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

it means `if and only if`.


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
The travis error this time seems unrelated.


---


[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

2018-06-19 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6168
  
Is it ok now ? @dawidwys 


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Is this  look ok now? ping @sihuazhou @dawidwys 


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198142501
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the 
`putAll`?


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198143091
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

But I don't know how to deal with the exception in a stream api in java8, 
do you have a better way to deal with this situation? thanks.


---


[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...

2018-07-01 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6234

[FLINK-9431]Introduce time bounded condition to cep

## What is the purpose of the change

In cep the event is now driving the transformation of the NFA, I think the 
time factor should also be taken into account in some senior.

When a key's data is not endless, and if we want to match the following 
pattern after we match the `AB` after `B` has appeared for  ten seconds.

```
Pattern.begin("A").followedBy("B").notFollowedBy("C")
``` 
We can not emit the result because there is no branch can lead to the 
`Final State`, And i think we can add a `TimeEnd` state to describe a pattern 
that accepts a time condition evaluated by processing time / event time to 
compare the timestamp in the element we have meant before.

As described in the issue link,  there are two main reason why i introduce 
this feature

1.  the `notFollowedBy` cant be at the end of the pattern 
2.  the `within` just compare with the element at start, and some key's 
data may not endless, so we have to evaluate condition not also on event but 
also on time

## Brief change log

1.  Add the method to distinguish the event driven condition or time 
drivern condition in `IterativeCondition`
2.  when `advanceTime`, we not only prune the expire element, but also look 
the time bounded condition


## Verifying this change

This change is already covered by existing cep tests, may be it need a 
little more about the new api.

This change added tests and can be verified as follows:


## Documentation

  - Does this pull request introduce a new feature? (yes)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink timeEnd-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6234


commit b1aa992a97c8eac818e57c3d2f82be76957052d0
Author: minwenjun 
Date:   2018-07-01T14:41:44Z

[FLINK-9431]Introduce time bounded condition to cep




---


[GitHub] flink pull request #5405: [FLINK-8477][Window]Add api to support user to ski...

2018-02-02 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/5405

[FLINK-8477][Window]Add api to support user to skip serval broken window

In production, some application like monitor type , it need the accuarcy 
data,but in this scenario: if we start a job at 10:45:20s with a 1min window 
aggregate, we may produce a broken data of 10:45min ,so may lead to mistake. We 
can support a user api to choose to skip serveral windows to avoid the broken 
data by user self.

## Brief change log

  - add a streaming api 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-8477

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5405


commit 9c6b77077bac2e0dfa4ea3bddf11bd27831ba3e4
Author: minwenjun <minwenjun@...>
Date:   2018-02-02T15:46:11Z

Add api to support user to skip serval broken window




---


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-02 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
cc @aljoscha please help review this patch.

![image](https://user-images.githubusercontent.com/9486140/35761522-6e00f4b8-08c4-11e8-8063-7ec015802428.png)
see the picture above, when user choose to use without a checkpoint to 
avoid catch up data after a crash , and use kafka#setStartFromLatest to consume 
the latest data. if use without the skip api , we can see that it can  produce 
a broken data which may lead to the alert in monitor Scenario。if user want to 
skip the broken window, can hava a choice to skip serveral window after the 
first fire.



---


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-11 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
Hi @aljoscha , you have mentioned two points : 
1. The events arrived may out of order in event-time processing 
2. We can use windowFunction or ProcessWindowFunction to filter serverl 
window by specify the start time of window and the endtime.

I have some differerent ideas: 
1. when we deal with the out-of-order eventtime stream, we may specify the 
maxOutOfOrder to avoid the too much late elements skipped, so when the job 
restart/start the maxNumOfWindow to be skipped can be set to  
maxOutOfOrder/(the length of the thumbling window), So that the late elements 
will not produce incorrect results. The num of the window need to be skipped is 
according to the degree of the out of order
2. We need to skip the serveral broken window data , and we dont know which 
window is broken, we can just detect which window is first fired and the serval 
window after this is broken too. The num should very from the production 
(according to the maxOutOfOrder & the length of the window )


---


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-09 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
ping @aljoscha 


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-07-08 Thread Aitozi
Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/6168


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-10 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
Aha, I meat to improve the performance of windowOperator, In the  scenario 
mentioned in the issue, this PR can avoid that bug, lucky hit ;-)


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-07-10 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Resolved the conflicts,  please help review when you free @dawidwys .


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-11 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
OK 👌


---


[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...

2018-07-11 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6111
  
Hi @StephanEwen , is there any change should be made to this PR ?


---


[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

2018-07-06 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6168
  
Hi @dawidwys , since this commit has been merged in, is this pr need to be 
closed by me?


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-06 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
ping @aljoscha , could you help review it, I'd like to hear your opinion on 
this PR too. thx


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-07-06 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Could you take a look at this PR @dawidwys ?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-07-11 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201895445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Hi @azagrebin , little doubt that you say the  

> return Iterable and avoid querying backend if not needed

But when deal with the ListState the `original.get()` has already query the 
original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the 
iterable element in memory?


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-07-11 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6186
  
Hi, after read the whole implementation,  i found that the state is expired 
when it is accessed, When there is the dirty data store to state and never be 
queried, how does it can be expired. Or is there an undergoing work for this ? 
@azagrebin 


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-07-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201971238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Got it, thanks for your explanation.


---


[GitHub] flink pull request #:

2018-07-09 Thread Aitozi
Github user Aitozi commented on the pull request:


https://github.com/apache/flink/commit/8c89f3c6b5ebd0334176d9e7e57b38b4d39a594a#commitcomment-29640344
  
Hi, do you discuss about the logic change of Trigger in  the PR: 
https://github.com/apache/flink/pull/6224


---


[GitHub] flink pull request #6224: [FLINK-9687]Delay the state fetch only when the tr...

2018-07-09 Thread Aitozi
Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/6224


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-09 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
Got it, Thanks for your explanation ;-). I will close this PR.


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-09 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
Hi, @aljoscha 
I read the PR of https://issues.apache.org/jira/browse/FLINK-5363, i think 
it is not a same thing with this PR. You meant to get `TriggerResult` no matter 
the window content is empty or not. And I meant to check the `TriggerResult` 
first to avoid get window state when the trigger are not ready to `FIRE`. And I 
think the cost of get window state is much more cost, so i think we can get the 
triggerResult first.


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-07-09 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
Get it, this change indeed will lead to the situation you mentioned. 
> What are the cases where this change leads to an improvement

I just read code and think the order is not suitable.

> I think it rarely happens that a timer fires while a window is empty

I think here is not the one in fact, because we get the window content each 
time before get the trigger result. AFAIK in there is `cleanupTimer` and 
`fireTimer` in `internalTimerService` in `windowOperator`.  we only have to get 
window content for `fireTimer` of a window. In now implementation, we have to 
extract both for the `cleanupTimer` and `fireTimer`.



---


[GitHub] flink issue #3105: [FLINK-4641] [cep] Support branching CEP patterns

2018-03-12 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/3105
  
why it is closed ? Is it has been merged in?


---


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-06-28 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
The failed info in travis error shows the test with `checkClusterEmpty` is 
wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated 
with this pull request.


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-22 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r197482701
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

Yes, you are right. Read the code again,  the `TAKE` branch only put the 
new `Node` to sharedBuffer which just point to the previousNodeId, This indeed 
don't affect the result of the current CS's partial match. I will take your 
suggestion


---


[jira] [Updated] (FLINK-7611) add metrics to measure the data drop by watermark

2017-09-11 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-7611:
--
Affects Version/s: 1.3.0

> add metrics to measure the data drop by watermark
> -
>
> Key: FLINK-7611
> URL: https://issues.apache.org/jira/browse/FLINK-7611
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: aitozi
>Priority: Minor
>  Labels: patch
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> when use the window operator with event time, the data come after the 
> window.endtime + allowLatency, the data will be droped, but there is no 
> existed metrics to measure the num of dropped data, and this value will help 
> to set the correct allowLatency



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7611) add metrics to measure the data drop by watermark

2017-09-11 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-7611:
--
Affects Version/s: 1.2.0

> add metrics to measure the data drop by watermark
> -
>
> Key: FLINK-7611
> URL: https://issues.apache.org/jira/browse/FLINK-7611
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: aitozi
>Priority: Minor
>  Labels: patch
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> when use the window operator with event time, the data come after the 
> window.endtime + allowLatency, the data will be droped, but there is no 
> existed metrics to measure the num of dropped data, and this value will help 
> to set the correct allowLatency



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-09-16 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168852#comment-16168852
 ] 

aitozi commented on FLINK-7608:
---

i am doubt that why it need to access the value of p50,p95,p99.   i think it 
dont need to keep many value, it just have to reflect the current latency of 
the operator. 

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7611) add metrics to measure the data drop by watermark

2017-09-11 Thread aitozi (JIRA)
aitozi created FLINK-7611:
-

 Summary: add metrics to measure the data drop by watermark
 Key: FLINK-7611
 URL: https://issues.apache.org/jira/browse/FLINK-7611
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: aitozi
Priority: Minor


when use the window operator with event time, the data come after the 
window.endtime + allowLatency, the data will be droped, but there is no existed 
metrics to measure the num of dropped data, and this value will help to set the 
correct allowLatency



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7611) add metrics to measure the data drop by watermark

2017-09-12 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16164008#comment-16164008
 ] 

aitozi commented on FLINK-7611:
---

i have add the pr,  https://github.com/apache/flink/pull/4665 ,  anyone care 
about this issue can comment on this,  thank you .

> add metrics to measure the data drop by watermark
> -
>
> Key: FLINK-7611
> URL: https://issues.apache.org/jira/browse/FLINK-7611
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: aitozi
>Priority: Minor
>  Labels: patch
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> when use the window operator with event time, the data come after the 
> window.endtime + allowLatency, the data will be droped, but there is no 
> existed metrics to measure the num of dropped data, and this value will help 
> to set the correct allowLatency



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7945) Per partition-lag metric lost in kafka connector

2017-11-01 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-7945:
--
Summary: Per partition-lag metric lost in kafka connector   (was: 
kafka-connector11 use kafkaConsumer0.9 caused it lost the important metric in 
kafka consumer clients11)

> Per partition-lag metric lost in kafka connector 
> -
>
> Key: FLINK-7945
> URL: https://issues.apache.org/jira/browse/FLINK-7945
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Metrics
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> when i want to add kafka-lag to per partition, i found that Kafka-connector11 
> used the kafka09Fetcher and kafka09Fetcher use the 09 version kafka client, 
> caused that the new metric like kafka per partition consumer lag lost.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   5   6   7   8   9   >