[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 @aljoscha yes, i review the code this day, it will jude each window whether late , so the previous method i use will counts more lost data than the actual situation , i have fix the error and re-push, please help me review the code again, thanks. ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 @aljoscha i agree that the name set to be "numLateElementsDropped", and do you mean that my result should minus the num of element that go to side output which is skipped and lateElement? ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 Hello,Is it can be merged in @aljoscha @zentol ? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140634868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- it looks better this way, i will adjust here :) ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140635202 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -231,6 +231,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- i think it should write in this way ``` if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.lostDataCount.inc(); } } ``` ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138789377 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- @zentol if i use `protected final Counter lostDataCount = new SimpleCounter()` then i run into `Caused by: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter` because `SimpleCounter` is not Serializable, so i think i should use the old way that ` this.lostDataCount = metrics.counter(LATE_ELEMENTS_METRIC_NAME);` in open method ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138778844 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- @zentol i have adjust my code according the comment, and add the doc about this metrics, please review again, thanks ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139742368 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- i think when the `isSkippedElement` is true, the `isElementLate(element)` is always be true. Because `isSkippedElement` is true when all the assigned window's window.endtime + allowLateness < currentLowWatermark, and `isElementLate` is true when element.time + allowLateness < currentLowWatermark. and element.time is <= bigest window.endtime. so does `isElementLate` always be true when isSkippedElement is true? And i think if i want to rule out the situation that **because no windows were assigned to it.**, i just need to judge whether the variable `Collection elementWindows` is empty? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139933981 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- haha, i know your mean now, i think its interesting ^ _ ^. i push the code again , and modify the metrics name according to @zentol , please review the code again @aljoscha ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/4665 [Flink-7611]add metrics to measure the num of data dropped due to the data arrived late ## What is the purpose of the change 1. add metrics to measure the num of data dropped due to the data arrived late,this is meanningful when to guide the user to set the suitable allowLatency or MaxOutOfOrder time ## Brief change log - register counter metrics in windowOperator#open() - invoke inc() method, when judged the isWindowLate() ## Verifying this change This change is already covered by existing tests by `mvn clean verify`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-7611 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4665 commit 35b684ce1f3b72018ced26af07808390dff68547 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-09-12T04:26:08Z add metrics to measure the data dropped due to arrive late commit aabdc224cb62b29975834e11c1374d182c4d4d01 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-09-12T05:58:07Z adjust format ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138515752 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- but i think the OperatorIOMetricGroup is all about IO Metrics about Operator, is it suitable? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138515541 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- Do you mean that i should add this in OperatorIOMetricGroup? ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 is there still anything wrong @zentol ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 is this ok to be merged in @bowenli86 @zentol @aljoscha ? ð ---
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 ping @tzulitai ~ ---
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 Hi, @tzulitai After i read kafkaConsumer code again, i found that the per partition kafka lag metric is register in method `FetchManagerMetrics#recordPartitionLag` But the when the client get the num equal to `max.poll.records ` at once poll, it will return the record it polls in advance left some partition haven't not been `sendFetches` to. So some partition will be lost. In test , if we just poll once , then register kafka metric , if i have many partition like about(100), some partition lag metric will be losed. So i think, with a configurable property, users can choose to when they have too many partition, and will do little harmless to the performance . Please let me know your idea ,thanks ---
[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4935#discussion_r148738495 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -245,6 +238,23 @@ public void run() { if (records == null) { try { records = consumer.poll(pollTimeout); + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics && !records.isEmpty()) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); + } else { + // we have Kafka metrics, register them + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { --- End diff -- i have realized the register several times and then skip the register, and then can successfully register the related metrics. Please let me know if you have any suggestion, thanks~ and the commits have been squash :) ---
[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4935#discussion_r148738023 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -245,6 +238,23 @@ public void run() { if (records == null) { try { records = consumer.poll(pollTimeout); + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics && !records.isEmpty()) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); --- End diff -- i change the level to debug. ---
[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/4935 [Flink-7945][Metrics]Fix per partition-lag metric lost in kafka connector ## What is the purpose of the change *When used KafkaConnector, we cant get per partition lag metric. But it has been exposed after kafka 0.10.2 [https://issues.apache.org/jira/browse/KAFKA-4381](url). After read the kafka code, i found that the per partition lag is register after `KafkaConsumer#poll` method be invoked, so i change the metric register time in flink , and after this, with kafka-connector10 and kafka-connector11 we can see the correct lag metric. * ## Brief change log - *Change the kafka metric register time in Flink kafka-connector* ## Verifying this change This change is already run through the test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-7945 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4935 commit 4f0e405fd0e697e67a0d4dc301d85244fc031086 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T12:51:46Z change the way to get metric in kafkaConsumerThread commit 183eea766ab6302c4f0813b2372f95a299ead67d Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T14:44:19Z overrride the createCallBridge method in kafkaFetcher10 commit d109efe7e2290eafdedf21fa7fbb4b8ac2d1bb58 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:11:41Z remove unused import commit 7dd26b6ddfe0f16ac57d9810dc46ae6b9fb34d18 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:13:34Z checkstyle commit 61db98e0469d85755d6cea560e110f61b6135739 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:29:47Z add debug log commit b55ab47b819dec90b18b8d57df5978aae0496e11 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:41:00Z remove log commit 64ae04f0846b6fcdc851e98a1df71e486bdf7762 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:43:44Z checkstyle commit bc16ae2ff89e63f71a050483bffb6d8a4389acd0 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T08:37:03Z change the location of register kafka metrics to flink commit 6fdf8e082669bd69fb730c32c5755660c59d2ab3 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T08:50:12Z checkstyle commit df2620926077c307510baaf74f0d10bf34fe6a1c Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T09:19:57Z use specific version poll method commit c7f44b99911665c974706c6025f69aa097657494 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T09:32:00Z method signature commit b41be18914c0ad8800f6faa30f1fcb0b995e40c0 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T13:52:39Z remove callbridge invoke commit c0dea5068cbb04763265b8f7dc6d80fc4b7cff49 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T14:29:31Z just for test commit e3df3a0705329d4e19f03a18b412e03664a62c9c Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:06:44Z judge poll success commit 3dbfa26ee6b46e6a1a6d708dd5bb759ff86014c8 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:47:07Z judge ConsumerRecords not empty commit 7f1f653e6346f0e09cf0582d312ae10d223ba92a Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:54:09Z checkstyle commit 7828945af3e560e782ee12f0cd11018d3f4e8dbf Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T16:11:14Z add flag to judge whether kafka has been registered commit 3dbd601ae20d1c5163a01e20b991b175f1180aff Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T16:15:24Z doc format commit f9b8fd4e2c9fc488456b141158d239ce2386a854 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T22:35:16Z add metrics exist judge and remove unsed code commit c14feacbe7db945f313de4a39dde13ecc1825924 Author: minwenjun <minwen...@
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 cc @zentol @tzulitai please help review the code. ---
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 Hi @tzulitai , could you take look at this again :-) ? ---
[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4960 ð ---
[GitHub] flink pull request #4935: [Flink-7945][Metrics]Fix per partition-l...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4935#discussion_r148530767 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -245,6 +238,23 @@ public void run() { if (records == null) { try { records = consumer.poll(pollTimeout); + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics && !records.isEmpty()) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); + } else { + // we have Kafka metrics, register them + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { --- End diff -- yes, i agree with you this is not the best way to solve. what do you think about try to register kafka metrics at the beginnng of the job for about serval times which can be configured by `properties`, after beyond the count, we will not run in the loop~ ---
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 update the code according to the comment. ping @tzulitai ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 Hi @zentol , can you help merge this pr ? Is this be forgetten ---
[GitHub] flink issue #4878: [FLINK-7895][hotfix][docs]Fix error in example in get lat...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4878 Yes, i will close this one ~ ---
[GitHub] flink pull request #4878: [FLINK-7895][hotfix][docs]Fix error in example in ...
Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/4878 ---
[GitHub] flink pull request #4878: [FLINK-7895][hotfix][docs]Fix error in example in ...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/4878 [FLINK-7895][hotfix][docs]Fix error in example in get late message in window doc * getSideOutput api is only available in SingleOutputOperator class, and is not the part of the base class You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-7895 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4878.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4878 commit ce09fb63138df9fe00af1281e8ee41cc0ad5bb45 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-20T20:11:58Z fix error in example in get late message in window doc ---
[GitHub] flink pull request #6080: [Flink-9443]Remove unused parameter in generateNod...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6080 [Flink-9443]Remove unused parameter in generateNodeLocalHash After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method generateNodeLocalHash dont use the information like (parallel, userFunction) now, so the parameter should be removed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-9443 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6080.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6080 commit c174cb29667e27abfb4fd74b0d5bca894f9da2d9 Author: minwenjun <minwenjun@...> Date: 2018-05-26T10:27:07Z Remove unused parameter in generateNodeLocalHash ---
[GitHub] flink issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern i...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6124 Got it. Thanks for your response @dawidwys and waiting for you further comment on this PR. I am glad to contribute to the cep library and really hope to get more ideas or share from you guys discussion about CEP, thanks. ---
[GitHub] flink issue #6171: [FLINK-9593] Unified After Match semantics with SQL MATCH...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6171 Hi, @dawidwys can you explain a little about how does the semantics of `AfterMatch` differ from previous implementation, I read the doc and feel a lille confused. thx ;-) ---
[GitHub] flink pull request #6168: [FLINK-9588]Reused context with same computation s...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6168 [FLINK-9588]Reused context with same computation state calculate ## What is the purpose of the change Now cep checkFilterCondition with a newly created Conditioncontext for each edge, which will result in the repeatable getEventsForPattern due to the init of `shouldUpdate` You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink context-reuse Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6168 commit ed71ac4407de9d8163efa8c334d9ac0e63e47069 Author: minwenjun Date: 2018-06-14T14:24:02Z [FLINK-9588]Reused context with same computation state calculate ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195737755 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- I think it needs the `conditionContext` and `computationState` and should replace the `sharedBuffer` with `conditionContext`. ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195738359 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- agree ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195745115 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- I think the sharedbuffer has been changed during the `TAKE` branch, the conditionContext should be different. ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195792448 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- I think over it again, the content of the `sharedBuffer` make difference to the result of the `getEventsForPattern`, so the result should be update with the change of the `sharedBuffer`. But i think we only have to reset the `shouldUpdate` flag to `true` here rather than create a context again, right? @dawidwys ---
[GitHub] flink issue #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQueueSta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6162 Thanks for your review , remove the same block in `onProcessingTime`. ---
[GitHub] flink pull request #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQ...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6162 [FLINK-9579][CEP]Remove unneeded clear on elementQueueState ## What is the purpose of the change Remove unneeded clear on elementQueueStateï¼ when soretedTimestamp is empty, the elements in elementQueueState are all removed, so don't need to clear again to waste time on RocksDB operation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink remove-clear Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6162 commit 62a1a506cf8dab263a247d81fa7092eaa0743624 Author: minwenjun Date: 2018-06-14T06:20:42Z [FLINK-9579][CEP]Remove unneeded clear on elementQueueState ---
[GitHub] flink issue #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQueueSta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6162 Is this OK? @dawidwys ---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6168 please help review this pr @dawidwys , thx. ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6104 [FLINK-9476]Emit late elements in CEP as sideOutPut Now, when use with Eventtime in CEP library, elements come later than watermark will be dropped,we can put it in side Output with outPutTag You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-9476 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6104 commit c0b04fadacd3e9f3f403b3adfdbadf8d4aac79e4 Author: minwenjun Date: 2018-05-30T15:32:15Z Add loseDataOutputTag in cep deal with event time dropped data commit 373e376fc182c32fe69765aa564e93057954ff44 Author: minwenjun Date: 2018-05-30T15:50:01Z add scala api ---
[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6104 @bowenli86 thanks for review, i have fix the error according to the comment and add the unit test in CEPITCase, please help review it again. cc @kl0u ---
[GitHub] flink issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalH...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6080 @aljoscha please help review this ,thanks ---
[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6109#discussion_r192628465 --- Diff: docs/start/building.md --- @@ -50,7 +50,11 @@ mvn clean install -DskipTests This instructs [Maven](http://maven.apache.org) (`mvn`) to first remove all existing builds (`clean`) and then create a new Flink binary (`install`). -To speed up the build you can skip tests, checkstyle, and JavaDocs: `mvn clean install -DskipTests -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true`. +To speed up the build you can skip tests, checkstyle, and JavaDocs: + +{% highlight bash %} +mvn clean install -DskipTests -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true --- End diff -- A little question, what is the meaning of the `-Dfast` ? ---
[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6059 @dawidwys I'm sorry, I don't have a systematic testing tool. I'm working on the inner dynamic cep and `CEPOperator` can process serveral `NFA` when receive an element. Then i encounter the backpressure problem, i just test the patch if can overcome the backpressure problem. with this patch apply, 200 parallel CEPOperator can handle about 7000qps data input with about 30 patterns(rule) without backpressue. The type of pattern may have different influence, so the data I provide here may not be so useful. ---
[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6104 thx, can help merge this @dawidwys ---
[GitHub] flink issue #6104: [FLINK-9476]Emit late elements in CEP as sideOutPut
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6104 @dawidwys fix the flaw according to your suggestion ;-) ---
[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6059#discussion_r192558813 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventWrapper.java --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Objects; + +/** + * Thin wrapper around user event that adds a lock. + * + * @param user event type + */ +public class EventWrapper { --- End diff -- Why the this class use the same name with the inner class in NFA.java? Is this intend to? It is a little confused ---
[GitHub] flink pull request #6111: [FLINK-9504]Change the log level of checkpoint dur...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6111 [FLINK-9504]Change the log level of checkpoint duration to debug Now, every time checkpoint it will log with the OperatorStateBackend and KeyedStateBackend with per partition/parallel time cost, it often lead to too much log in TaskManager , i think the log level should change to the debug, is it ok ? @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-9504 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6111 commit 8368e2e82ad6e0605bfbc4692cde3cc09e431816 Author: minwenjun Date: 2018-06-02T13:05:04Z change the log level of checkpoint cost to debug ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r192428178 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -54,6 +54,12 @@ // comparator to sort events private final EventComparator comparator; + /** +* Side output {@code OutputTag} for late data. If no tag is set late data will simply be --- End diff -- fixed ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r192428106 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lateDataOutputTag = new OutputTag("late-data""){}; + +OutputTag outputTag = new OutputTag("side-output""){}; --- End diff -- removed ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r192428038 --- Diff: docs/dev/libs/cep.md --- @@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed matters. To guarantee that el To guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed. +seen watermark. Late elements are not further processed. Also, you can specify a sideOutput tag to collect the late elements come after the last seen watermark, you can use it like this. + + + + +{% highlight java %} +PatternStream patternStream = CEP.pattern(input, pattern); + +OutputTag lateDataOutputTag = new OutputTag("late-data""){}; + +OutputTag outputTag = new OutputTag("side-output""){}; + +SingleOutputStreamOperator result = patternStream +.sideOutputLateData(lateDataOutputTag) +.select( +new PatternTimeoutFunction() {...}, +outputTag, +new PatternSelectFunction() {...} +); + +DataStream lateData = result.getSideOutput(lateDataOutputTag); + + +{% endhighlight %} + + + + + +{% highlight scala %} +val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) + +val lateDataOutputTag = OutputTag[String]("late-data") + +val result: SingleOutputStreamOperator[ComplexEvent] = patternStream +.sideOutputLateData(lateDataOutputTag) +.select(outputTag){ --- End diff -- OK, I remove the timeout tag to reduce confusion, and fix a little error in before doc(the sequence of the parameter is wrong) ---
[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6104#discussion_r192428502 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -383,6 +386,100 @@ public String select(Map> pattern) { env.execute(); } + @Test + public void testSimpleKeyedPatternEventTimeWithSideOutput() throws Exception { --- End diff -- move the test case to CepOperatorTest, please help review again, thanks ;) ---
[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6059 After go through the whole change, i think it is extremely a good feature ð, and i will apply this patch to our inner library to test how much the performance improved. A little question: why the Travis run failed, i have checked the _job run failed log_, but can't find the reason. ---
[GitHub] flink pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pa...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6124 [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern ## What is the purpose of the change As described in the jira [FLINK-8914](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-8914) There is something wrong with `greedy` when it is the head of the pattern. Because the `NFA` process each `ComputationState` and will produce a new `Start ComputationState`. So when it runs into the greedy match, other `start runs` can also be set up ## Brief change log *(for example:)* - *Add a new StateType `Greedy` for convenience of distinguishing the greedy in computations* - *Remove the redundant start state during process* ## Verifying this change Add two UT in `GreedyITCase` You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink greedyfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6124 commit 76c1a4516b4bc98043d944335cc7a0aacd359278 Author: minwenjun Date: 2018-06-05T16:07:55Z Fix wrong semantic when greedy pattern is the head of the pattern ---
[GitHub] flink issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern i...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6124 @dawidwys please help review this pr when you free,thx ---
[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6111 thanks @StephanEwen for review. As you mentioned, I only set the log level in state backends to `debug` , is there something wrong with my pull request or misunderstand what your meaning ? ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Using the `entries#putAll` in `flushCache` lead to the count in `NFAStateAccessTest` increased, I will check it locally , this travis will fail. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198151182 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- Get it. ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 fixed the error that the access to state increased in `NFAStateAccessTest` by add the `isEmpty` judgment before update the state. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6205 [FLINK-9642]Reduce the count to deal with state during a CEP process ## What is the purpose of the change With the rework of sharedBuffer Flink-9418, the lock & release operation is deal with state backend which is different from the previous version which will read the state of sharedBuffer all to memory, i think we can add a cache or variable in sharedbuffer to cache the LockAble Object to mark the ref change in once process in NFA, this will reduce the count when the events point to the same Node. And flush the result to MapState at the end of process. ## Brief change log - add the eventsBufferCache and entryCache - flush the cache after one turn process You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink onceQueryCache Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6205 commit 184ec24474ee5b8a1c9f932286ef4aed4f1dabd6 Author: minwenjun Date: 2018-06-23T12:56:55Z [FLINK-9642]Reduce the count to deal with state during a CEP process ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Hi @zhangminglei , thanks for your review. I only check the SharedBufferTest locally before, the error in travis means the num of state access (read and write) is less than before which is the purpose of this pr, and I fix the error. cc @dawidwys ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629310 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event --- End diff -- fixed ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629218 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -75,6 +76,9 @@ private MapState eventsCount; private MapState> entries; + private HashMap> eventsBufferCache = new HashMap<>(); + private HashMap> entryCache = new HashMap<>(); --- End diff -- agree and fixed ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629205 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- it means `if and only if`. ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 The travis error this time seems unrelated. ---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6168 Is it ok now ? @dawidwys ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Is this look ok now? ping @sihuazhou @dawidwys ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198142501 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the `putAll`? ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198143091 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- But I don't know how to deal with the exception in a stream api in java8, do you have a better way to deal with this situation? thanks. ---
[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6234 [FLINK-9431]Introduce time bounded condition to cep ## What is the purpose of the change In cep the event is now driving the transformation of the NFA, I think the time factor should also be taken into account in some senior. When a key's data is not endless, and if we want to match the following pattern after we match the `AB` after `B` has appeared for ten seconds. ``` Pattern.begin("A").followedBy("B").notFollowedBy("C") ``` We can not emit the result because there is no branch can lead to the `Final State`, And i think we can add a `TimeEnd` state to describe a pattern that accepts a time condition evaluated by processing time / event time to compare the timestamp in the element we have meant before. As described in the issue link, there are two main reason why i introduce this feature 1. the `notFollowedBy` cant be at the end of the pattern 2. the `within` just compare with the element at start, and some key's data may not endless, so we have to evaluate condition not also on event but also on time ## Brief change log 1. Add the method to distinguish the event driven condition or time drivern condition in `IterativeCondition` 2. when `advanceTime`, we not only prune the expire element, but also look the time bounded condition ## Verifying this change This change is already covered by existing cep tests, may be it need a little more about the new api. This change added tests and can be verified as follows: ## Documentation - Does this pull request introduce a new feature? (yes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink timeEnd-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6234 commit b1aa992a97c8eac818e57c3d2f82be76957052d0 Author: minwenjun Date: 2018-07-01T14:41:44Z [FLINK-9431]Introduce time bounded condition to cep ---
[GitHub] flink pull request #5405: [FLINK-8477][Window]Add api to support user to ski...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/5405 [FLINK-8477][Window]Add api to support user to skip serval broken window In production, some application like monitor type , it need the accuarcy data,but in this scenario: if we start a job at 10:45:20s with a 1min window aggregate, we may produce a broken data of 10:45min ,so may lead to mistake. We can support a user api to choose to skip serveral windows to avoid the broken data by user self. ## Brief change log - add a streaming api You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-8477 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5405 commit 9c6b77077bac2e0dfa4ea3bddf11bd27831ba3e4 Author: minwenjun <minwenjun@...> Date: 2018-02-02T15:46:11Z Add api to support user to skip serval broken window ---
[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/5405 cc @aljoscha please help review this patch. ![image](https://user-images.githubusercontent.com/9486140/35761522-6e00f4b8-08c4-11e8-8063-7ec015802428.png) see the picture above, when user choose to use without a checkpoint to avoid catch up data after a crash , and use kafka#setStartFromLatest to consume the latest data. if use without the skip api , we can see that it can produce a broken data which may lead to the alert in monitor Scenarioãif user want to skip the broken window, can hava a choice to skip serveral window after the first fire. ---
[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/5405 Hi @aljoscha , you have mentioned two points : 1. The events arrived may out of order in event-time processing 2. We can use windowFunction or ProcessWindowFunction to filter serverl window by specify the start time of window and the endtime. I have some differerent ideas: 1. when we deal with the out-of-order eventtime stream, we may specify the maxOutOfOrder to avoid the too much late elements skipped, so when the job restart/start the maxNumOfWindow to be skipped can be set to maxOutOfOrder/(the length of the thumbling window), So that the late elements will not produce incorrect results. The num of the window need to be skipped is according to the degree of the out of order 2. We need to skip the serveral broken window data , and we dont know which window is broken, we can just detect which window is first fired and the serval window after this is broken too. The num should very from the production (according to the maxOutOfOrder & the length of the window ) ---
[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/5405 ping @aljoscha ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/6168 ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 Aha, I meat to improve the performance of windowOperator, In the scenario mentioned in the issue, this PR can avoid that bug, lucky hit ;-) ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Resolved the conflicts, please help review when you free @dawidwys . ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 OK ð ---
[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6111 Hi @StephanEwen , is there any change should be made to this PR ? ---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6168 Hi @dawidwys , since this commit has been merged in, is this pr need to be closed by me? ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 ping @aljoscha , could you help review it, I'd like to hear your opinion on this PR too. thx ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Could you take a look at this PR @dawidwys ? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201895445 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Hi @azagrebin , little doubt that you say the > return Iterable and avoid querying backend if not needed But when deal with the ListState the `original.get()` has already query the original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the iterable element in memory? ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6186 Hi, after read the whole implementation, i found that the state is expired when it is accessed, When there is the dirty data store to state and never be queried, how does it can be expired. Or is there an undergoing work for this ? @azagrebin ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201971238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Got it, thanks for your explanation. ---
[GitHub] flink pull request #:
Github user Aitozi commented on the pull request: https://github.com/apache/flink/commit/8c89f3c6b5ebd0334176d9e7e57b38b4d39a594a#commitcomment-29640344 Hi, do you discuss about the logic change of Trigger in the PR: https://github.com/apache/flink/pull/6224 ---
[GitHub] flink pull request #6224: [FLINK-9687]Delay the state fetch only when the tr...
Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/6224 ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 Got it, Thanks for your explanation ;-ï¼. I will close this PR. ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 Hi, @aljoscha I read the PR of https://issues.apache.org/jira/browse/FLINK-5363, i think it is not a same thing with this PR. You meant to get `TriggerResult` no matter the window content is empty or not. And I meant to check the `TriggerResult` first to avoid get window state when the trigger are not ready to `FIRE`. And I think the cost of get window state is much more cost, so i think we can get the triggerResult first. ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 Get it, this change indeed will lead to the situation you mentioned. > What are the cases where this change leads to an improvement I just read code and think the order is not suitable. > I think it rarely happens that a timer fires while a window is empty I think here is not the one in fact, because we get the window content each time before get the trigger result. AFAIK in there is `cleanupTimer` and `fireTimer` in `internalTimerService` in `windowOperator`. we only have to get window content for `fireTimer` of a window. In now implementation, we have to extract both for the `cleanupTimer` and `fireTimer`. ---
[GitHub] flink issue #3105: [FLINK-4641] [cep] Support branching CEP patterns
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/3105 why it is closed ? Is it has been merged in? ---
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 The failed info in travis error shows the test with `checkClusterEmpty` is wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated with this pull request. ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r197482701 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- Yes, you are right. Read the code again, the `TAKE` branch only put the new `Node` to sharedBuffer which just point to the previousNodeId, This indeed don't affect the result of the current CS's partial match. I will take your suggestion ---
[jira] [Updated] (FLINK-7611) add metrics to measure the data drop by watermark
[ https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-7611: -- Affects Version/s: 1.3.0 > add metrics to measure the data drop by watermark > - > > Key: FLINK-7611 > URL: https://issues.apache.org/jira/browse/FLINK-7611 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: aitozi >Priority: Minor > Labels: patch > Original Estimate: 168h > Remaining Estimate: 168h > > when use the window operator with event time, the data come after the > window.endtime + allowLatency, the data will be droped, but there is no > existed metrics to measure the num of dropped data, and this value will help > to set the correct allowLatency -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7611) add metrics to measure the data drop by watermark
[ https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-7611: -- Affects Version/s: 1.2.0 > add metrics to measure the data drop by watermark > - > > Key: FLINK-7611 > URL: https://issues.apache.org/jira/browse/FLINK-7611 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: aitozi >Priority: Minor > Labels: patch > Original Estimate: 168h > Remaining Estimate: 168h > > when use the window operator with event time, the data come after the > window.endtime + allowLatency, the data will be droped, but there is no > existed metrics to measure the num of dropped data, and this value will help > to set the correct allowLatency -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168852#comment-16168852 ] aitozi commented on FLINK-7608: --- i am doubt that why it need to access the value of p50,p95,p99. i think it dont need to keep many value, it just have to reflect the current latency of the operator. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7611) add metrics to measure the data drop by watermark
aitozi created FLINK-7611: - Summary: add metrics to measure the data drop by watermark Key: FLINK-7611 URL: https://issues.apache.org/jira/browse/FLINK-7611 Project: Flink Issue Type: Improvement Components: Metrics Reporter: aitozi Priority: Minor when use the window operator with event time, the data come after the window.endtime + allowLatency, the data will be droped, but there is no existed metrics to measure the num of dropped data, and this value will help to set the correct allowLatency -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7611) add metrics to measure the data drop by watermark
[ https://issues.apache.org/jira/browse/FLINK-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16164008#comment-16164008 ] aitozi commented on FLINK-7611: --- i have add the pr, https://github.com/apache/flink/pull/4665 , anyone care about this issue can comment on this, thank you . > add metrics to measure the data drop by watermark > - > > Key: FLINK-7611 > URL: https://issues.apache.org/jira/browse/FLINK-7611 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: aitozi >Priority: Minor > Labels: patch > Original Estimate: 168h > Remaining Estimate: 168h > > when use the window operator with event time, the data come after the > window.endtime + allowLatency, the data will be droped, but there is no > existed metrics to measure the num of dropped data, and this value will help > to set the correct allowLatency -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7945) Per partition-lag metric lost in kafka connector
[ https://issues.apache.org/jira/browse/FLINK-7945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-7945: -- Summary: Per partition-lag metric lost in kafka connector (was: kafka-connector11 use kafkaConsumer0.9 caused it lost the important metric in kafka consumer clients11) > Per partition-lag metric lost in kafka connector > - > > Key: FLINK-7945 > URL: https://issues.apache.org/jira/browse/FLINK-7945 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Metrics >Affects Versions: 1.4.0, 1.3.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > when i want to add kafka-lag to per partition, i found that Kafka-connector11 > used the kafka09Fetcher and kafka09Fetcher use the 09 version kafka client, > caused that the new metric like kafka per partition consumer lag lost. -- This message was sent by Atlassian JIRA (v6.4.14#64029)