[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575182#comment-15575182
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
Thanks a lot for reviewing it again!


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575175#comment-15575175
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2386
  
This looks good to merge now!  


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572276#comment-15572276
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
I had an offline discussion with @aljoscha about this PR, and we decided to 
remove the `isSink` flag from the `AbstractStreamOperator`.
I updated the PR.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571815#comment-15571815
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83207030
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

The reason why we still differentiate between those is that we don't want 
to keep the latency statistics for all parallel source instances at the 
intermediate operators.
So in the current implementation:
- Intermediate operators maintain latency statistics for each logical 
source (so not much data)
- Sinks maintain latency statistics for all parallel instances of each 
logical source (a bit more data)
The sinks maintain the additional data to allow users debugging latency 
issues with individual parallel instances (for example when a machine running a 
source has issues).


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571767#comment-15571767
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83203023
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

Do we need the differentiation between sinks/non-sinks anymore now that we 
decided to report the latency for all operators?


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571752#comment-15571752
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
Thank you for the review @zentol and @aljoscha.
I addressed your comments and rebased the code to the current master. I'm 
currently testing the change again locally. Once that is completed 
successfully, I would like to merge the change. This pull request has been open 
for a long time, and I don't want to rebase it again ;)


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571741#comment-15571741
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83201419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

We would not need to set this as a value in the `StreamConfig` class, 
because its basically a check if `StreamConfig.getOutEdgesInOrder().size == 0`. 
The problem is that `getOutEdgesInOrder()` expects the user code classloader. 
The classloader is usually not available anymore in the operators.




> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476406#comment-15476406
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78149051
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
 
+
+   // 

+   //  Metrics
+   // 

+
+   // --- One input stream
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   // --- Two input stream
+   public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+
+   protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+   if(isSink) {
--- End diff --

I forgot to push my latest commit, where this is fixed ;)


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476388#comment-15476388
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78148192
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -125,6 +126,11 @@ public void processWatermark(Watermark mark) throws 
Exception {
}
 
@Override
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
--- End diff --

Why is this necessary?


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476390#comment-15476390
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78148223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
 
+
+   // 

+   //  Metrics
+   // 

+
+   // --- One input stream
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   // --- Two input stream
+   public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+
+   protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+   if(isSink) {
--- End diff --

Don't all operators now keep the latency statistics? 


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476381#comment-15476381
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78147868
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 ---
@@ -95,11 +100,17 @@ public DirectedOutput(
 
@Override
public void emitWatermark(Watermark mark) {
-   for (Output out : allOutputs) {
+   for (Output out: allOutputs) {
--- End diff --

I think the space was good here.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473775#comment-15473775
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997800
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

could we not set this value in the `operatorConfig` instead?


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473764#comment-15473764
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
 ---
@@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws 
Exception {
}
}
 
+
--- End diff --

unnecessary new line


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473762#comment-15473762
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997021
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -347,4 +521,5 @@ public void close() {
output.close();
}
}
+
--- End diff --

unnecessary new line


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473760#comment-15473760
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
+   LOG.warn("{} has been set to a value below 0: {}. Using 
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+   historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+   }
+
+   latencyGauge = new LatencyGauge(this.metrics, historySize, 
!isSink);
--- End diff --

this looks a bit odd; `latencyGauge = this.metrics.gauge(new 
LatencyGauge(historySize, isSink), "latency")` would be more consistent to how 
other metrics are registered.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473754#comment-15473754
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
--- End diff --

missing space after if


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473751#comment-15473751
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996378
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -149,6 +154,8 @@
 
private LinkedHashSet registeredPojoTypes = new 
LinkedHashSet<>();
 
+
--- End diff --

new lines should be removed


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473745#comment-15473745
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996152
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
--- End diff --

LatencyMaker -> LatencyMarker


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> 

[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473747#comment-15473747
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996178
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
+The marker contains a timestamp from the time when the record has been 
emitted at the sources.
+Latency marker can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
--- End diff --

marker -> markers


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, 

[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473681#comment-15473681
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
@aljoscha I updated the pull request, which now builds green (just rebasing 
fixed the issues).
I also keep metrics now per operator, not only for the sinks.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15448653#comment-15448653
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
I got some offline feedback from @aljoscha regarding the change to 
implement the methods required by `OneInputStreamOperator` and 
`TwoInputStreamOperator` in `AbstractStreamOperator` to avoid "polluting" all 
the custom operators.

I'm currently working on fixing the `EventTimeWindowCheckpointingITCase`, 
which seems to be failing more frequently due to my change.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434595#comment-15434595
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
I agree that the semantics are not properly defined. I'll add a section to 
the documentation explaining what users can expect from the `LatencyMarks`.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427964#comment-15427964
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2386
  
Thanks for the contribution @rmetzger. I briefly skimmed over the code and 
noticed that `LatencyMarker` don't participate in windows. Thus, I was asking 
myself what latency actually means. Could you please define properly what 
latency measures and how this can be a useful metric for users (especially wrt 
windows and window functions).


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427949#comment-15427949
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
Good point, thank you! I'm not sure yet if I keep the format of the metrics 
like this.
I find those nested HashMaps quite clumsy. Once this PR and the 
metrics-in-webinterface-pr is merged, I'll add the latency to the web interface 
as well .. Then I can make a final decision on the format.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427947#comment-15427947
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2386
  
The metrics documentations contains a list of all measured metrics, we 
should add this metric there as well.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426825#comment-15426825
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2386

[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of 
records going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an 
configured interval. The default value for the interval is 2000 ms. The 
emission of markers can be disabled by setting the interval to 0. 
LatencyMarkers can not "overtake" regular elements. This ensures that the 
measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward 
latency markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. 
This ensures that every marker exists only once in the system, and that 
repartition steps are not causing an explosion in the number of transferred 
markers.
If an operator is a sink, it will maintain the last 512 latencies from each 
known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a 
special LatencyGauge from the sink (every operator can be a sink, if it doesn't 
have any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are 
in-sync, so the latency measurements will be inaccurate if the hardware clocks 
are not correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3660-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2386






> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-05-22 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295688#comment-15295688
 ] 

Stephan Ewen commented on FLINK-3660:
-

I would break this into two tasks:
  - measure latency and exposes them via a metric
  - visualize that metric in the Web Dashboard



> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)