[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83207030 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- The reason why we still differentiate between those is that we don't want to keep the latency statistics for all parallel source instances at the intermediate operators. So in the current implementation: - Intermediate operators maintain latency statistics for each logical source (so not much data) - Sinks maintain latency statistics for all parallel instances of each logical source (a bit more data) The sinks maintain the additional data to allow users debugging latency issues with individual parallel instances (for example when a machine running a source has issues). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83203023 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- Do we need the differentiation between sinks/non-sinks anymore now that we decided to report the latency for all operators? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83201419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- We would not need to set this as a value in the `StreamConfig` class, because its basically a check if `StreamConfig.getOutEdgesInOrder().size == 0`. The problem is that `getOutEdgesInOrder()` expects the user code classloader. The classloader is usually not available anymore in the operators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78149051 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() { return chainingStrategy; } + + // + // Metrics + // + + // --- One input stream + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + // --- Two input stream + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + if(isSink) { --- End diff -- I forgot to push my latest commit, where this is fixed ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78148192 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -125,6 +126,11 @@ public void processWatermark(Watermark mark) throws Exception { } @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { --- End diff -- Why is this necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78148223 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() { return chainingStrategy; } + + // + // Metrics + // + + // --- One input stream + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + // --- Two input stream + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + if(isSink) { --- End diff -- Don't all operators now keep the latency statistics? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78147868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java --- @@ -95,11 +100,17 @@ public DirectedOutput( @Override public void emitWatermark(Watermark mark) { - for (Outputout : allOutputs) { + for (Output out: allOutputs) { --- End diff -- I think the space was good here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- could we not set this value in the `operatorConfig` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997086 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java --- @@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws Exception { } } + --- End diff -- unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997021 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -347,4 +521,5 @@ public void close() { output.close(); } } + --- End diff -- unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996891 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { + LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); + historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + } + + latencyGauge = new LatencyGauge(this.metrics, historySize, !isSink); --- End diff -- this looks a bit odd; `latencyGauge = this.metrics.gauge(new LatencyGauge(historySize, isSink), "latency")` would be more consistent to how other metrics are registered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996378 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -149,6 +154,8 @@ private LinkedHashSetregisteredPojoTypes = new LinkedHashSet<>(); + --- End diff -- new lines should be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996178 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. +The marker contains a timestamp from the time when the record has been emitted at the sources. +Latency marker can not overtake regular user records, thus if records are queuing up in front of an operator, --- End diff -- marker -> markers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996152 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. --- End diff -- LatencyMaker -> LatencyMarker --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2386 [FLINK-3660] Measure latency and exposes them via a metric This commit adds the initial runtime support for measuring latency of records going through the system. I therefore introduced a new StreamElement, called a LatencyMarker. Similar to Watermarks, LatencyMarkers are emitted from the sources at an configured interval. The default value for the interval is 2000 ms. The emission of markers can be disabled by setting the interval to 0. LatencyMarkers can not "overtake" regular elements. This ensures that the measured latency approximates the end-to-end latency of regular stream elements. Regular operators (excluding those participating in iterations) forward latency markers if they are not a sink. Operators with many outputs randomly select one to forward the maker to. This ensures that every marker exists only once in the system, and that repartition steps are not causing an explosion in the number of transferred markers. If an operator is a sink, it will maintain the last 512 latencies from each known source instance. The min/max/mean/p50/p95/p99 of each known source is reported using a special LatencyGauge from the sink (every operator can be a sink, if it doesn't have any outputs). This commit does not visualize the latency in the web interface. Also, there is currently no mechanism to ensure that the system clocks are in-sync, so the latency measurements will be inaccurate if the hardware clocks are not correct. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3660-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2386.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---