[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575182#comment-15575182 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 Thanks a lot for reviewing it again! > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575175#comment-15575175 ] ASF GitHub Bot commented on FLINK-3660: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2386 This looks good to merge now! > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572276#comment-15572276 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 I had an offline discussion with @aljoscha about this PR, and we decided to remove the `isSink` flag from the `AbstractStreamOperator`. I updated the PR. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571815#comment-15571815 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83207030 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- The reason why we still differentiate between those is that we don't want to keep the latency statistics for all parallel source instances at the intermediate operators. So in the current implementation: - Intermediate operators maintain latency statistics for each logical source (so not much data) - Sinks maintain latency statistics for all parallel instances of each logical source (a bit more data) The sinks maintain the additional data to allow users debugging latency issues with individual parallel instances (for example when a machine running a source has issues). > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571767#comment-15571767 ] ASF GitHub Bot commented on FLINK-3660: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83203023 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- Do we need the differentiation between sinks/non-sinks anymore now that we decided to report the latency for all operators? > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571752#comment-15571752 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 Thank you for the review @zentol and @aljoscha. I addressed your comments and rebased the code to the current master. I'm currently testing the change again locally. Once that is completed successfully, I would like to merge the change. This pull request has been open for a long time, and I don't want to rebase it again ;) > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571741#comment-15571741 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r83201419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- We would not need to set this as a value in the `StreamConfig` class, because its basically a check if `StreamConfig.getOutEdgesInOrder().size == 0`. The problem is that `getOutEdgesInOrder()` expects the user code classloader. The classloader is usually not available anymore in the operators. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476406#comment-15476406 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78149051 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() { return chainingStrategy; } + + // + // Metrics + // + + // --- One input stream + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + // --- Two input stream + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + if(isSink) { --- End diff -- I forgot to push my latest commit, where this is fixed ;) > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476388#comment-15476388 ] ASF GitHub Bot commented on FLINK-3660: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78148192 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -125,6 +126,11 @@ public void processWatermark(Watermark mark) throws Exception { } @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { --- End diff -- Why is this necessary? > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476390#comment-15476390 ] ASF GitHub Bot commented on FLINK-3660: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78148223 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() { return chainingStrategy; } + + // + // Metrics + // + + // --- One input stream + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + // --- Two input stream + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + if(isSink) { --- End diff -- Don't all operators now keep the latency statistics? > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476381#comment-15476381 ] ASF GitHub Bot commented on FLINK-3660: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r78147868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java --- @@ -95,11 +100,17 @@ public DirectedOutput( @Override public void emitWatermark(Watermark mark) { - for (Outputout : allOutputs) { + for (Output out: allOutputs) { --- End diff -- I think the space was good here. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473775#comment-15473775 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- could we not set this value in the `operatorConfig` instead? > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473764#comment-15473764 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997086 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java --- @@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws Exception { } } + --- End diff -- unnecessary new line > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473762#comment-15473762 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997021 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -347,4 +521,5 @@ public void close() { output.close(); } } + --- End diff -- unnecessary new line > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473760#comment-15473760 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996891 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { + LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); + historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + } + + latencyGauge = new LatencyGauge(this.metrics, historySize, !isSink); --- End diff -- this looks a bit odd; `latencyGauge = this.metrics.gauge(new LatencyGauge(historySize, isSink), "latency")` would be more consistent to how other metrics are registered. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473754#comment-15473754 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { --- End diff -- missing space after if > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473751#comment-15473751 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996378 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -149,6 +154,8 @@ private LinkedHashSetregisteredPojoTypes = new LinkedHashSet<>(); + --- End diff -- new lines should be removed > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473745#comment-15473745 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996152 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. --- End diff -- LatencyMaker -> LatencyMarker > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. >
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473747#comment-15473747 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996178 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. +The marker contains a timestamp from the time when the record has been emitted at the sources. +Latency marker can not overtake regular user records, thus if records are queuing up in front of an operator, --- End diff -- marker -> markers > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job,
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473681#comment-15473681 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 @aljoscha I updated the pull request, which now builds green (just rebasing fixed the issues). I also keep metrics now per operator, not only for the sinks. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15448653#comment-15448653 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 I got some offline feedback from @aljoscha regarding the change to implement the methods required by `OneInputStreamOperator` and `TwoInputStreamOperator` in `AbstractStreamOperator` to avoid "polluting" all the custom operators. I'm currently working on fixing the `EventTimeWindowCheckpointingITCase`, which seems to be failing more frequently due to my change. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434595#comment-15434595 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 I agree that the semantics are not properly defined. I'll add a section to the documentation explaining what users can expect from the `LatencyMarks`. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427964#comment-15427964 ] ASF GitHub Bot commented on FLINK-3660: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2386 Thanks for the contribution @rmetzger. I briefly skimmed over the code and noticed that `LatencyMarker` don't participate in windows. Thus, I was asking myself what latency actually means. Could you please define properly what latency measures and how this can be a useful metric for users (especially wrt windows and window functions). > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427949#comment-15427949 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 Good point, thank you! I'm not sure yet if I keep the format of the metrics like this. I find those nested HashMaps quite clumsy. Once this PR and the metrics-in-webinterface-pr is merged, I'll add the latency to the web interface as well .. Then I can make a final decision on the format. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427947#comment-15427947 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2386 The metrics documentations contains a list of all measured metrics, we should add this metric there as well. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426825#comment-15426825 ] ASF GitHub Bot commented on FLINK-3660: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2386 [FLINK-3660] Measure latency and exposes them via a metric This commit adds the initial runtime support for measuring latency of records going through the system. I therefore introduced a new StreamElement, called a LatencyMarker. Similar to Watermarks, LatencyMarkers are emitted from the sources at an configured interval. The default value for the interval is 2000 ms. The emission of markers can be disabled by setting the interval to 0. LatencyMarkers can not "overtake" regular elements. This ensures that the measured latency approximates the end-to-end latency of regular stream elements. Regular operators (excluding those participating in iterations) forward latency markers if they are not a sink. Operators with many outputs randomly select one to forward the maker to. This ensures that every marker exists only once in the system, and that repartition steps are not causing an explosion in the number of transferred markers. If an operator is a sink, it will maintain the last 512 latencies from each known source instance. The min/max/mean/p50/p95/p99 of each known source is reported using a special LatencyGauge from the sink (every operator can be a sink, if it doesn't have any outputs). This commit does not visualize the latency in the web interface. Also, there is currently no mechanism to ensure that the system clocks are in-sync, so the latency measurements will be inaccurate if the hardware clocks are not correct. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3660-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2386.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2386 > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295688#comment-15295688 ] Stephan Ewen commented on FLINK-3660: - I would break this into two tasks: - measure latency and exposes them via a metric - visualize that metric in the Web Dashboard > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)