[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of records 
going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an 
configured interval. The default value for the interval is 2000 ms. The 
emission of markers can be disabled by setting the interval to 0. 
LatencyMarkers can not "overtake" regular elements. This ensures that the 
measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward latency 
markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. This 
ensures that every marker exists only once in the system, and that repartition 
steps are not causing an explosion in the number of transferred markers.
If an operator is a sink, it will maintain the last 512 latencies from each 
known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a special 
LatencyGauge from the sink (every operator can be a sink, if it doesn't have 
any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are 
in-sync, so the latency measurements will be inaccurate if the hardware clocks 
are not correct.

This closes #2386


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a612b996
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a612b996
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a612b996

Branch: refs/heads/master
Commit: a612b9966f3ee020a5721ac2f039a3633c40146c
Parents: a648f88
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Apr 4 12:09:56 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Fri Oct 14 14:33:32 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 113 +++++++-----
 docs/setup/config.md                            |   3 +
 .../flink/storm/wrappers/BoltWrapperTest.java   |   2 +
 .../flink/api/common/ExecutionConfig.java       |  39 ++++
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../flink/runtime/execution/Environment.java    |   2 +-
 .../io/network/api/writer/RecordWriter.java     |  53 +++---
 .../operators/testutils/DummyEnvironment.java   |   2 +-
 .../api/collector/selector/DirectedOutput.java  |  11 ++
 .../api/operators/AbstractStreamOperator.java   | 179 ++++++++++++++++++-
 .../api/operators/OneInputStreamOperator.java   |   3 +
 .../flink/streaming/api/operators/Output.java   |   3 +
 .../streaming/api/operators/StreamCounter.java  |  44 -----
 .../api/operators/StreamGroupedReduce.java      |   1 -
 .../streaming/api/operators/StreamSink.java     |  10 +-
 .../streaming/api/operators/StreamSource.java   |  42 +++++
 .../api/operators/TwoInputStreamOperator.java   |  25 ++-
 .../runtime/io/RecordWriterOutput.java          |  17 +-
 .../runtime/io/StreamInputProcessor.java        |  26 +--
 .../runtime/io/StreamRecordWriter.java          |   9 +
 .../runtime/io/StreamTwoInputProcessor.java     |  37 ++--
 .../runtime/streamrecord/LatencyMarker.java     | 106 +++++++++++
 .../MultiplexingStreamRecordSerializer.java     |  23 ++-
 .../runtime/streamrecord/StreamElement.java     |  17 ++
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  51 ++++--
 .../runtime/tasks/StreamIterationTail.java      |  10 ++
 .../streaming/runtime/tasks/StreamTask.java     |   8 +
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../api/graph/StreamGraphGeneratorTest.java     |  16 ++
 .../api/operators/StreamCounterTest.java        |  61 -------
 .../operators/StreamSourceOperatorTest.java     |  67 ++++++-
 ...AlignedProcessingTimeWindowOperatorTest.java |   1 +
 .../operators/windowing/CollectingOutput.java   |   8 +-
 .../apache/flink/streaming/util/MockOutput.java |   6 +
 .../util/OneInputStreamOperatorTestHarness.java |   6 +
 .../util/TwoInputStreamOperatorTestHarness.java |   6 +
 .../test/streaming/runtime/TimestampITCase.java |   1 +
 38 files changed, 781 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 0e51407..6de5b5e 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -376,7 +376,6 @@ Flink exposes the following system metrics:
       <th class="text-left">Description</th>
     </tr>
   </thead>
-
   <tbody>
     <tr>
       <th rowspan="1"><strong>JobManager</strong></th>
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
       <td></td>
     </tr>
     <tr>
-      <tr>
-        <th rowspan="7"><strong>Task</strong></t>
-        <td>currentLowWatermark</td>
-        <td>The lowest watermark a task has received.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointDuration</td>
-        <td>The time it took to complete the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointSize</td>
-        <td>The total size of the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>restartingTime</td>
-        <td>The time it took to restart the job.</td>
-      </tr>
-      <tr>
-        <td>numBytesInLocal</td>
-        <td>The total number of bytes this task has read from a local 
source.</td>
-      </tr>
-      <tr>
-        <td>numBytesInRemote</td>
-        <td>The total number of bytes this task has read from a remote 
source.</td>
-      </tr>
-      <tr>
-        <td>numBytesOut</td>
-        <td>The total number of bytes this task has emitted.</td>
-      </tr>
-    </tr>
-    <tr>
-      <tr>
-        <th rowspan="3"><strong>Operator</strong></th>
-        <td>numRecordsIn</td>
-        <td>The total number of records this operator has received.</td>
-      </tr>
-      <tr>
-        <td>numRecordsOut</td>
-        <td>The total number of records this operator has emitted.</td>
-      </tr>
-      <tr>
-        <td>numSplitsProcessed</td>
-        <td>The total number of InputSplits this data source has 
processed.</td>
-      </tr>
+      <th rowspan="7"><strong>Task</strong></th>
+      <td>currentLowWatermark</td>
+      <td>The lowest watermark a task has received.</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointDuration</td>
+      <td>The time it took to complete the last checkpoint.</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointSize</td>
+      <td>The total size of the last checkpoint.</td>
+    </tr>
+    <tr>
+      <td>restartingTime</td>
+      <td>The time it took to restart the job.</td>
+    </tr>
+    <tr>
+      <td>numBytesInLocal</td>
+      <td>The total number of bytes this task has read from a local 
source.</td>
+    </tr>
+    <tr>
+      <td>numBytesInRemote</td>
+      <td>The total number of bytes this task has read from a remote 
source.</td>
+    </tr>
+    <tr>
+      <td>numBytesOut</td>
+      <td>The total number of bytes this task has emitted.</td>
+    </tr>
+    <tr>
+      <th rowspan="4"><strong>Operator</strong></th>
+      <td>numRecordsIn</td>
+      <td>The total number of records this operator has received.</td>
+    </tr>
+    <tr>
+      <td>numRecordsOut</td>
+      <td>The total number of records this operator has emitted.</td>
+    </tr>
+    <tr>
+      <td>numSplitsProcessed</td>
+      <td>The total number of InputSplits this data source has processed (if 
the operator is a data source).</td>
+    </tr>
+    <tr>
+      <td>latency</td>
+      <td>A latency gauge reporting the latency distribution from the 
different sources.</td>
     </tr>
   </tbody>
 </table>
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. To 
enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a special 
record, called a `LatencyMarker`.
+The marker contains a timestamp from the time when the record has been emitted 
at the sources.
+Latency markers can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
+it will add to the latency tracked by the marker.
+
+Note that the latency markers are not accounting for the time user records 
spend in operators as they are
+bypassing them. In particular the markers are not accounting for the time 
records spend for example in window buffers.
+Only if operators are not able to accept new records, thus they are queuing 
up, the latency measured using
+the markers will reflect that.
+
+All intermediate operators keep a list of the last `n` latencies from each 
source to compute 
+a latency distribution.
+The sink operators keep a list from each source, and each parallel source 
instance to allow detecting 
+latency issues caused by individual machines.
+
+Currently, Flink assumes that the clocks of all machines in the cluster are in 
sync. We recommend setting
+up an automated clock synchronisation service (like NTP) to avoid false 
latency results.
+
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 54ef394..3f6b705 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -378,6 +378,9 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `metrics.scope.tm.operator`: (Default: 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;operator_name&gt;.&lt;subtask_index&gt;)
 Defines the scope format string that is applied to all metrics scoped to an 
operator.
 
+- `metrics.latency.history-size`: (Default: 128) Defines the number of 
measured latencies to maintain at each operator
+
+
 ## Background
 
 ### Configuring the Network Buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index c15b5f6..e0659da 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -369,6 +370,7 @@ public class BoltWrapperTest extends AbstractTest {
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 
0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
                when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
+               when(env.getTaskManagerInfo()).thenReturn(new 
TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
 
                StreamTask<?, ?> mockTask = mock(StreamTask.class);
                when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index a0a63b1..3daf9cf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -121,6 +121,11 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        private long autoWatermarkInterval = 0;
 
        /**
+        * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
+        */
+       private long latencyTrackingInterval = 2000L;
+
+       /**
         * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
         */
        @Deprecated
@@ -205,6 +210,40 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        }
 
        /**
+        * Interval for sending latency tracking marks from the sources to the 
sinks.
+        * Flink will send latency tracking marks from the sources at the 
specified interval.
+        *
+        * Recommended value: 2000 (2 seconds).
+        *
+        * Setting a tracking interval <= 0 disables the latency tracking.
+        *
+        * @param interval Interval in milliseconds.
+        */
+       @PublicEvolving
+       public ExecutionConfig setLatencyTrackingInterval(long interval) {
+               this.latencyTrackingInterval = interval;
+               return this;
+       }
+
+       /**
+        * Returns the latency tracking interval.
+        * @return The latency tracking interval in milliseconds
+        */
+       @PublicEvolving
+       public long getLatencyTrackingInterval() {
+               return latencyTrackingInterval;
+       }
+
+       /**
+        * Returns if latency tracking is enabled
+        * @return True, if the tracking is enabled, false otherwise.
+        */
+       @PublicEvolving
+       public boolean isLatencyTrackingEnabled() {
+               return latencyTrackingInterval > 0;
+       }
+
+       /**
         * Gets the parallelism with which operation are executed by default. 
Operations can
         * individually override this value to use a specific parallelism.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f508d4a..3fe0306 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -868,6 +868,10 @@ public final class ConfigConstants {
        /** The scope format string that is applied to all metrics scoped to an 
operator. */
        public static final String METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
 
+       /** The number of measured latencies to maintain at each operator */
+       public static final String METRICS_LATENCY_HISTORY_SIZE = 
"metrics.latency.history-size";
+
+
        // ---------------------------- Checkpoints 
-------------------------------
 
        /** The default directory for savepoints. */
@@ -886,6 +890,7 @@ public final class ConfigConstants {
        @Deprecated
        public static final String SAVEPOINT_FS_DIRECTORY_KEY = 
"savepoints.state.backend.fs.dir";
 
+
        // 
------------------------------------------------------------------------
        //                            Default Values
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index cbbeec7..f0ff918 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -79,7 +79,7 @@ public interface Environment {
        ExecutionAttemptID getExecutionId();
 
        /**
-        * Returns the task-wide configuration object, originally attache to 
the job vertex.
+        * Returns the task-wide configuration object, originally attached to 
the job vertex.
         *
         * @return The task-wide configuration
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4963698..422aa65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -25,8 +25,10 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.io.IOException;
+import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 
@@ -54,6 +56,8 @@ public class RecordWriter<T extends IOReadableWritable> {
        /** {@link RecordSerializer} per outgoing channel */
        private final RecordSerializer<T>[] serializers;
 
+       private final Random RNG = new XORShiftRandom();
+
        public RecordWriter(ResultPartitionWriter writer) {
                this(writer, new RoundRobinChannelSelector<T>());
        }
@@ -78,22 +82,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
        public void emit(T record) throws IOException, InterruptedException {
                for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-                       // serialize with corresponding serializer and send 
full buffer
-                       RecordSerializer<T> serializer = 
serializers[targetChannel];
-
-                       synchronized (serializer) {
-                               SerializationResult result = 
serializer.addRecord(record);
-                               while (result.isFullBuffer()) {
-                                       Buffer buffer = 
serializer.getCurrentBuffer();
-
-                                       if (buffer != null) {
-                                               writeBuffer(buffer, 
targetChannel, serializer);
-                                       }
-
-                                       buffer = 
writer.getBufferProvider().requestBufferBlocking();
-                                       result = 
serializer.setNextBuffer(buffer);
-                               }
-                       }
+                       sendToTarget(record, targetChannel);
                }
        }
 
@@ -103,21 +92,31 @@ public class RecordWriter<T extends IOReadableWritable> {
         */
        public void broadcastEmit(T record) throws IOException, 
InterruptedException {
                for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-                       // serialize with corresponding serializer and send 
full buffer
-                       RecordSerializer<T> serializer = 
serializers[targetChannel];
+                       sendToTarget(record, targetChannel);
+               }
+       }
 
-                       synchronized (serializer) {
-                               SerializationResult result = 
serializer.addRecord(record);
-                               while (result.isFullBuffer()) {
-                                       Buffer buffer = 
serializer.getCurrentBuffer();
+       /**
+        * This is used to send LatencyMarks to a random target channel
+        */
+       public void randomEmit(T record) throws IOException, 
InterruptedException {
+               sendToTarget(record, RNG.nextInt(numChannels));
+       }
 
-                                       if (buffer != null) {
-                                               writeBuffer(buffer, 
targetChannel, serializer);
-                                       }
+       private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
+               RecordSerializer<T> serializer = serializers[targetChannel];
 
-                                       buffer = 
writer.getBufferProvider().requestBufferBlocking();
-                                       result = 
serializer.setNextBuffer(buffer);
+               synchronized (serializer) {
+                       SerializationResult result = 
serializer.addRecord(record);
+                       while (result.isFullBuffer()) {
+                               Buffer buffer = serializer.getCurrentBuffer();
+
+                               if (buffer != null) {
+                                       writeBuffer(buffer, targetChannel, 
serializer);
                                }
+
+                               buffer = 
writer.getBufferProvider().requestBufferBlocking();
+                               result = serializer.setNextBuffer(buffer);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index bb07122..04ba4e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -92,7 +92,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return null;
+               return new TaskManagerRuntimeInfo("foo", new Configuration(), 
"foo");
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index 8346013..24f1c63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -23,13 +23,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.XORShiftRandom;
 
 
 public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
@@ -42,6 +45,8 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        
        protected final Output<StreamRecord<OUT>>[] allOutputs;
 
+       private final Random RNG = new XORShiftRandom();
+
        
        @SuppressWarnings({"unchecked", "rawtypes"})
        public DirectedOutput(
@@ -100,6 +105,12 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                }
        }
 
+       @Override
+       public void emitLatencyMarker(LatencyMarker latencyMarker) {
+               // randomly select an output
+               
allOutputs[RNG.nextInt(allOutputs.length)].emitLatencyMarker(latencyMarker);
+       }
+
        protected Set<Output<StreamRecord<OUT>>> 
selectOutputs(StreamRecord<OUT> record)  {
                Set<Output<StreamRecord<OUT>>> selectedOutputs = new 
HashSet<>(selectAllOutputs.length);
                Collections.addAll(selectedOutputs, selectAllOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0ca89ef..77e4d9a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,13 +19,17 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -38,12 +42,16 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ConcurrentModificationException;
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
 
@@ -103,7 +111,13 @@ public abstract class AbstractStreamOperator<OUT>
 
        private transient Collection<OperatorStateHandle> 
lazyRestoreStateHandles;
 
-       protected transient MetricGroup metrics;
+
+       // --------------- Metrics ---------------------------
+
+       /** Metric group for the operator */
+       protected MetricGroup metrics;
+
+       protected LatencyGauge latencyGauge;
 
        // 
------------------------------------------------------------------------
        //  Life Cycle
@@ -117,12 +131,21 @@ public abstract class AbstractStreamOperator<OUT>
                
                this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
                this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+               Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+               int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+               if (historySize <= 0) {
+                       LOG.warn("{} has been set to a value below 0: {}. Using 
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+                       historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+               }
+
+               latencyGauge = this.metrics.gauge("latency", new 
LatencyGauge(historySize));
                this.runtimeContext = new StreamingRuntimeContext(this, 
container.getEnvironment(), container.getAccumulatorMap());
 
                stateKeySelector1 = config.getStatePartitioner(0, 
getUserCodeClassloader());
                stateKeySelector2 = config.getStatePartitioner(1, 
getUserCodeClassloader());
        }
        
+       @Override
        public MetricGroup getMetricGroup() {
                return metrics;
        }
@@ -365,6 +388,155 @@ public abstract class AbstractStreamOperator<OUT>
                return chainingStrategy;
        }
 
+
+       // 
------------------------------------------------------------------------
+       //  Metrics
+       // 
------------------------------------------------------------------------
+
+       // ------- One input stream
+       public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+               reportOrForwardLatencyMarker(latencyMarker);
+       }
+
+       // ------- Two input stream
+       public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
+               reportOrForwardLatencyMarker(latencyMarker);
+       }
+
+       public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
+               reportOrForwardLatencyMarker(latencyMarker);
+       }
+
+
+       protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+               // all operators are tracking latencies
+               this.latencyGauge.reportLatency(maker, false);
+
+               // everything except sinks forwards latency markers
+               this.output.emitLatencyMarker(maker);
+       }
+
+       // ----------------------- Helper classes -----------------------
+
+
+       /**
+        * The gauge uses a HashMap internally to avoid classloading issues 
when accessing
+        * the values using JMX.
+        */
+       protected static class LatencyGauge implements Gauge<Map<String, 
HashMap<String, Double>>> {
+               private final Map<LatencySourceDescriptor, 
DescriptiveStatistics> latencyStats = new HashMap<>();
+               private final int historySize;
+
+               LatencyGauge(int historySize) {
+                       this.historySize = historySize;
+               }
+
+               public void reportLatency(LatencyMarker marker, boolean isSink) 
{
+                       LatencySourceDescriptor sourceDescriptor = 
LatencySourceDescriptor.of(marker, !isSink);
+                       DescriptiveStatistics sourceStats = 
latencyStats.get(sourceDescriptor);
+                       if (sourceStats == null) {
+                               // 512 element window (4 kb)
+                               sourceStats = new 
DescriptiveStatistics(this.historySize);
+                               latencyStats.put(sourceDescriptor, sourceStats);
+                       }
+                       long now = System.currentTimeMillis();
+                       sourceStats.addValue(now - marker.getMarkedTime());
+               }
+
+               @Override
+               public Map<String, HashMap<String, Double>> getValue() {
+                       while (true) {
+                               try {
+                                       Map<String, HashMap<String, Double>> 
ret = new HashMap<>();
+                                       for (Map.Entry<LatencySourceDescriptor, 
DescriptiveStatistics> source : latencyStats.entrySet()) {
+                                               HashMap<String, Double> 
sourceStatistics = new HashMap<>(6);
+                                               sourceStatistics.put("max", 
source.getValue().getMax());
+                                               sourceStatistics.put("mean", 
source.getValue().getMean());
+                                               sourceStatistics.put("min", 
source.getValue().getMin());
+                                               sourceStatistics.put("p50", 
source.getValue().getPercentile(50));
+                                               sourceStatistics.put("p95", 
source.getValue().getPercentile(95));
+                                               sourceStatistics.put("p99", 
source.getValue().getPercentile(99));
+                                               
ret.put(source.getKey().toString(), sourceStatistics);
+                                       }
+                                       return ret;
+                                       // Concurrent access onto the 
"latencyStats" map could cause
+                                       // ConcurrentModificationExceptions. To 
avoid unnecessary blocking
+                                       // of the reportLatency() method, we 
retry this operation until
+                                       // it succeeds.
+                               } catch(ConcurrentModificationException ignore) 
{
+                                       LOG.debug("Unable to report latency 
statistics", ignore);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Identifier for a latency source
+        */
+       private static class LatencySourceDescriptor {
+               /**
+                * A unique ID identifying a logical source in Flink
+                */
+               private final int vertexID;
+
+               /**
+                * Identifier for parallel subtasks of a logical source
+                */
+               private final int subtaskIndex;
+
+               /**
+                *
+                * @param marker The latency marker to extract the 
LatencySourceDescriptor from.
+                * @param ignoreSubtaskIndex Set to true to ignore the subtask 
index, to treat the latencies from all the parallel instances of a source as 
the same.
+                * @return A LatencySourceDescriptor for the given marker.
+                */
+               public static LatencySourceDescriptor of(LatencyMarker marker, 
boolean ignoreSubtaskIndex) {
+                       if (ignoreSubtaskIndex) {
+                               return new 
LatencySourceDescriptor(marker.getVertexID(), -1);
+                       } else {
+                               return new 
LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
+                       }
+
+               }
+
+               private LatencySourceDescriptor(int vertexID, int subtaskIndex) 
{
+                       this.vertexID = vertexID;
+                       this.subtaskIndex = subtaskIndex;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       LatencySourceDescriptor that = 
(LatencySourceDescriptor) o;
+
+                       if (vertexID != that.vertexID) {
+                               return false;
+                       }
+                       return subtaskIndex == that.subtaskIndex;
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = vertexID;
+                       result = 31 * result + subtaskIndex;
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return "LatencySourceDescriptor{" +
+                                       "vertexID=" + vertexID +
+                                       ", subtaskIndex=" + subtaskIndex +
+                                       '}';
+               }
+       }
+
        public class CountingOutput implements Output<StreamRecord<OUT>> {
                private final Output<StreamRecord<OUT>> output;
                private final Counter numRecordsOut;
@@ -380,6 +552,11 @@ public abstract class AbstractStreamOperator<OUT>
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       output.emitLatencyMarker(latencyMarker);
+               }
+
+               @Override
                public void collect(StreamRecord<OUT> record) {
                        numRecordsOut.inc();
                        output.collect(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
index 323feb5..d9de230 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -46,4 +47,6 @@ public interface OneInputStreamOperator<IN, OUT> extends 
StreamOperator<OUT> {
         * @see org.apache.flink.streaming.api.watermark.Watermark
         */
        void processWatermark(Watermark mark) throws Exception;
+
+       void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index 4a7002f..ec2409e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.util.Collector;
 
 /**
@@ -39,4 +40,6 @@ public interface Output<T> extends Collector<T> {
         * timestamp will be emitted in the future.
         */
        void emitWatermark(Watermark mark);
+
+       void emitLatencyMarker(LatencyMarker latencyMarker);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
deleted file mode 100644
index 8835032..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-@Internal
-public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements 
OneInputStreamOperator<IN, Long> {
-
-       private static final long serialVersionUID = 1L;
-
-       private Long count = 0L;
-
-       public StreamCounter() {
-               chainingStrategy = ChainingStrategy.ALWAYS;
-       }
-
-       @Override
-       public void processElement(StreamRecord<IN> element) {
-               output.collect(element.replace(++count));
-       }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index b11e22c..229c254 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -70,5 +70,4 @@ public class StreamGroupedReduce<IN> extends 
AbstractUdfStreamOperator<IN, Reduc
                output.emitWatermark(mark);
        }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index 9fa2039..bd0f574 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -30,7 +31,6 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
 
        public StreamSink(SinkFunction<IN> sinkFunction) {
                super(sinkFunction);
-
                chainingStrategy = ChainingStrategy.ALWAYS;
        }
 
@@ -43,4 +43,12 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
        public void processWatermark(Watermark mark) throws Exception {
                // ignore it for now, we are a sink, after all
        }
+
+       @Override
+       protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+               // all operators are tracking latencies
+               this.latencyGauge.reportLatency(maker, true);
+
+               // sinks don't forward latency markers
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 1409ae4..a07e6b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,8 +21,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * {@link StreamOperator} for streaming sources.
  *
@@ -53,6 +59,13 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        
        public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
+
+               LatencyMarksEmitter latencyEmitter = null;
+               if(getExecutionConfig().isLatencyTrackingEnabled()) {
+                       latencyEmitter = new 
LatencyMarksEmitter<>(lockingObject, collector, 
getExecutionConfig().getLatencyTrackingInterval(),
+                                       getOperatorConfig().getVertexID(), 
getRuntimeContext().getIndexOfThisSubtask());
+               }
+               
                final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
                this.ctx = StreamSourceContexts.getSourceContext(
@@ -70,6 +83,9 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                } finally {
                        // make sure that the context is closed in any case
                        ctx.close();
+                       if(latencyEmitter != null) {
+                               latencyEmitter.close();
+                       }
                }
        }
 
@@ -103,4 +119,30 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        protected boolean isCanceledOrStopped() {
                return canceledOrStopped;
        }
+
+       private static class LatencyMarksEmitter<OUT> {
+               private final ScheduledExecutorService scheduleExecutor;
+               private final ScheduledFuture<?> latencyMarkTimer;
+
+               public LatencyMarksEmitter(final Object lockingObject, final 
Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int 
vertexID, final int subtaskIndex) {
+                       this.scheduleExecutor = 
Executors.newScheduledThreadPool(1);
+                       this.latencyMarkTimer = 
scheduleExecutor.scheduleAtFixedRate(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               synchronized (lockingObject) {
+                                                       
output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), 
vertexID, subtaskIndex));
+                                               }
+                                       } catch (Throwable t) {
+                                               LOG.warn("Error while emitting 
latency marker", t);
+                                       }
+                               }
+                       }, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS);
+               }
+
+               public void close() {
+                       latencyMarkTimer.cancel(true);
+                       scheduleExecutor.shutdownNow();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
index d22583d..e45fedf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -38,13 +39,13 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> 
extends StreamOperator<OU
         * Processes one element that arrived on the first input of this 
two-input operator.
         * This method is guaranteed to not be called concurrently with other 
methods of the operator.
         */
-       public void processElement1(StreamRecord<IN1> element) throws Exception;
+       void processElement1(StreamRecord<IN1> element) throws Exception;
 
        /**
         * Processes one element that arrived on the second input of this 
two-input operator.
         * This method is guaranteed to not be called concurrently with other 
methods of the operator.
         */
-       public void processElement2(StreamRecord<IN2> element) throws Exception;
+       void processElement2(StreamRecord<IN2> element) throws Exception;
 
        /**
         * Processes a {@link Watermark} that arrived on the first input of 
this two-input operator.
@@ -52,7 +53,7 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> 
extends StreamOperator<OU
         *
         * @see org.apache.flink.streaming.api.watermark.Watermark
         */
-       public void processWatermark1(Watermark mark) throws Exception;
+       void processWatermark1(Watermark mark) throws Exception;
 
        /**
         * Processes a {@link Watermark} that arrived on the second input of 
this two-input operator.
@@ -60,6 +61,22 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> 
extends StreamOperator<OU
         *
         * @see org.apache.flink.streaming.api.watermark.Watermark
         */
-       public void processWatermark2(Watermark mark) throws Exception;
+       void processWatermark2(Watermark mark) throws Exception;
+
+       /**
+        * Processes a {@link LatencyMarker} that arrived on the first input of 
this two-input operator.
+        * This method is guaranteed to not be called concurrently with other 
methods of the operator.
+        *
+        * @see org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
+        */
+       void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception;
+
+       /**
+        * Processes a {@link LatencyMarker} that arrived on the second input 
of this two-input operator.
+        * This method is guaranteed to not be called concurrently with other 
methods of the operator.
+        *
+        * @see org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
+        */
+       void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index c9d579f..9f046f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -48,7 +49,7 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        public RecordWriterOutput(
                        
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
                        TypeSerializer<OUT> outSerializer,
-                       boolean enableWatermarkMultiplexing) {
+                       boolean enableMultiplexing) {
 
                checkNotNull(recordWriter);
                
@@ -58,7 +59,7 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                                (StreamRecordWriter<?>) recordWriter;
 
                TypeSerializer<StreamElement> outRecordSerializer;
-               if (enableWatermarkMultiplexing) {
+               if (enableMultiplexing) {
                        outRecordSerializer = new 
MultiplexingStreamRecordSerializer<OUT>(outSerializer);
                } else {
                        outRecordSerializer = (TypeSerializer<StreamElement>)
@@ -94,6 +95,18 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                }
        }
 
+       @Override
+       public void emitLatencyMarker(LatencyMarker latencyMarker) {
+               serializationDelegate.setInstance(latencyMarker);
+
+               try {
+                       recordWriter.randomEmit(serializationDelegate);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+       }
+
        public void broadcastEvent(AbstractEvent barrier) throws IOException, 
InterruptedException {
                recordWriter.broadcastEvent(barrier);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 2dbc6d4..47e55dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -89,7 +89,7 @@ public class StreamInputProcessor<IN> {
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
-                       boolean enableWatermarkMultiplexing) throws IOException 
{
+                       boolean enableMultiplexing) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -107,13 +107,13 @@ public class StreamInputProcessor<IN> {
                        
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
                }
                
-               if (enableWatermarkMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN> ser = new 
MultiplexingStreamRecordSerializer<IN>(inputSerializer);
-                       this.deserializationDelegate = new 
NonReusingDeserializationDelegate<StreamElement>(ser);
+               if (enableMultiplexing) {
+                       MultiplexingStreamRecordSerializer<IN> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(ser);
                } else {
                        StreamRecordSerializer<IN> ser = new 
StreamRecordSerializer<IN>(inputSerializer);
                        this.deserializationDelegate = 
(NonReusingDeserializationDelegate<StreamElement>)
-                                       (NonReusingDeserializationDelegate<?>) 
new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
+                                       (NonReusingDeserializationDelegate<?>) 
new NonReusingDeserializationDelegate<>(ser);
                }
                
                // Initialize one deserializer per input channel
@@ -150,14 +150,14 @@ public class StreamInputProcessor<IN> {
                                }
 
                                if (result.isFullRecord()) {
-                                       StreamElement recordOrWatermark = 
deserializationDelegate.getInstance();
+                                       StreamElement recordOrMark = 
deserializationDelegate.getInstance();
 
-                                       if (recordOrWatermark.isWatermark()) {
-                                               long watermarkMillis = 
recordOrWatermark.asWatermark().getTimestamp();
+                                       if (recordOrMark.isWatermark()) {
+                                               long watermarkMillis = 
recordOrMark.asWatermark().getTimestamp();
                                                if (watermarkMillis > 
watermarks[currentChannel]) {
                                                        
watermarks[currentChannel] = watermarkMillis;
                                                        long newMinWatermark = 
Long.MAX_VALUE;
-                                                       for (long watermark : 
watermarks) {
+                                                       for (long watermark: 
watermarks) {
                                                                newMinWatermark 
= Math.min(watermark, newMinWatermark);
                                                        }
                                                        if (newMinWatermark > 
lastEmittedWatermark) {
@@ -168,9 +168,15 @@ public class StreamInputProcessor<IN> {
                                                        }
                                                }
                                                continue;
+                                       } else 
if(recordOrMark.isLatencyMarker()) {
+                                               // handle latency marker
+                                               synchronized (lock) {
+                                                       
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
+                                               }
+                                               continue;
                                        } else {
                                                // now we can do the actual 
processing
-                                               StreamRecord<IN> record = 
recordOrWatermark.asRecord();
+                                               StreamRecord<IN> record = 
recordOrMark.asRecord();
                                                synchronized (lock) {
                                                        numRecordsIn.inc();
                                                        
streamOperator.setKeyContextElement1(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index f46b366..6d5e89b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -98,6 +98,15 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
                }
        }
 
+       @Override
+       public void randomEmit(T record) throws IOException, 
InterruptedException {
+               checkErroneous();
+               super.randomEmit(record);
+               if (flushAlways) {
+                       flush();
+               }
+       }
+
        /**
         * Closes the writer. This stops the flushing thread (if there is one).
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 70ce783..a25c1a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 import java.io.IOException;
@@ -97,7 +96,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
-                       boolean enableWatermarkMultiplexing) throws IOException 
{
+                       boolean enableMultiplexing) throws IOException {
                
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -115,24 +114,24 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
                }
                
-               if (enableWatermarkMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN1> ser = new 
MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-                       this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate<StreamElement>(ser);
+               if (enableMultiplexing) {
+                       MultiplexingStreamRecordSerializer<IN1> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer1);
+                       this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate<>(ser);
                }
                else {
-                       StreamRecordSerializer<IN1> ser = new 
StreamRecordSerializer<IN1>(inputSerializer1);
+                       StreamRecordSerializer<IN1> ser = new 
StreamRecordSerializer<>(inputSerializer1);
                        this.deserializationDelegate1 = 
(DeserializationDelegate<StreamElement>)
-                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<>(ser);
                }
                
-               if (enableWatermarkMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN2> ser = new 
MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-                       this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate<StreamElement>(ser);
+               if (enableMultiplexing) {
+                       MultiplexingStreamRecordSerializer<IN2> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer2);
+                       this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate<>(ser);
                }
                else {
-                       StreamRecordSerializer<IN2> ser = new 
StreamRecordSerializer<IN2>(inputSerializer2);
+                       StreamRecordSerializer<IN2> ser = new 
StreamRecordSerializer<>(inputSerializer2);
                        this.deserializationDelegate2 = 
(DeserializationDelegate<StreamElement>)
-                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
+                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<>(ser);
                }
 
                // Initialize one deserializer per input channel
@@ -185,7 +184,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                        if (currentChannel < numInputChannels1) 
{
                                                StreamElement recordOrWatermark 
= deserializationDelegate1.getInstance();
                                                if 
(recordOrWatermark.isWatermark()) {
-                                                       
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, 
lock);
+                                                       
handleWatermark(streamOperator, recordOrWatermark.asWatermark(), 
currentChannel, lock);
+                                                       continue;
+                                               }
+                                               else if 
(recordOrWatermark.isLatencyMarker()) {
+                                                       synchronized (lock) {
+                                                               
streamOperator.processLatencyMarker1(recordOrWatermark.asLatencyMarker());
+                                                       }
                                                        continue;
                                                }
                                                else {
@@ -203,6 +208,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                                        
handleWatermark(streamOperator, recordOrWatermark.asWatermark(), 
currentChannel, lock);
                                                        continue;
                                                }
+                                               else if 
(recordOrWatermark.isLatencyMarker()) {
+                                                       synchronized (lock) {
+                                                               
streamOperator.processLatencyMarker2(recordOrWatermark.asLatencyMarker());
+                                                       }
+                                                       continue;
+                                               }
                                                else {
                                                        synchronized (lock) {
                                                                
streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
new file mode 100644
index 0000000..714bdae
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Special record type carrying a timestamp of its creation time at a source 
operator
+ * and the vertexId and subtask index of the operator.
+ *
+ * At sinks, the marker can be used to approximate the time a record needs to 
travel
+ * through the dataflow.
+ */
+@PublicEvolving
+public final class LatencyMarker extends StreamElement {
+
+       // 
------------------------------------------------------------------------
+
+       /** The time the latency mark is denoting */
+       private final long markedTime;
+
+       private final int vertexID;
+
+       private final int subtaskIndex;
+
+       /**
+        * Creates a latency mark with the given timestamp
+        */
+       public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) {
+               this.markedTime = markedTime;
+               this.vertexID = vertexID;
+               this.subtaskIndex = subtaskIndex;
+       }
+
+       /**
+        * Returns the timestamp marked by the LatencyMarker
+        */
+       public long getMarkedTime() {
+               return markedTime;
+       }
+
+       public int getVertexID() {
+               return vertexID;
+       }
+
+       public int getSubtaskIndex() {
+               return subtaskIndex;
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()){
+                       return false;
+               }
+
+               LatencyMarker that = (LatencyMarker) o;
+
+               if (markedTime != that.markedTime) {
+                       return false;
+               }
+               if (vertexID != that.vertexID) {
+                       return false;
+               }
+               return subtaskIndex == that.subtaskIndex;
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = (int) (markedTime ^ (markedTime >>> 32));
+               result = 31 * result + vertexID;
+               result = 31 * result + subtaskIndex;
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "LatencyMarker{" +
+                               "markedTime=" + markedTime +
+                               ", vertexID=" + vertexID +
+                               ", subtaskIndex=" + subtaskIndex +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 832c4b6..95e3ebd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -43,6 +43,7 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
        private static final int TAG_REC_WITH_TIMESTAMP = 0;
        private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
        private static final int TAG_WATERMARK = 2;
+       private static final int TAG_LATENCY_MARKER = 3;
        
        
        private final TypeSerializer<T> typeSerializer;
@@ -95,7 +96,7 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                        StreamRecord<T> fromRecord = from.asRecord();
                        return 
fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
                }
-               else if (from.isWatermark()) {
+               else if (from.isWatermark() || from.isLatencyMarker()) {
                        // is immutable
                        return from;
                }
@@ -114,7 +115,7 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                        fromRecord.copyTo(valueCopy, reuseRecord);
                        return reuse;
                }
-               else if (from.isWatermark()) {
+               else if (from.isWatermark() || from.isLatencyMarker()) {
                        // is immutable
                        return from;
                }
@@ -139,7 +140,11 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                else if (tag == TAG_WATERMARK) {
                        target.writeLong(source.readLong());
                }
-               else {
+               else if (tag == TAG_LATENCY_MARKER) {
+                       target.writeLong(source.readLong());
+                       target.writeInt(source.readInt());
+                       target.writeInt(source.readInt());
+               } else {
                        throw new IOException("Corrupt stream, found tag: " + 
tag);
                }
        }
@@ -161,6 +166,12 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                        target.write(TAG_WATERMARK);
                        target.writeLong(value.asWatermark().getTimestamp());
                }
+               else if (value.isLatencyMarker()) {
+                       target.write(TAG_LATENCY_MARKER);
+                       
target.writeLong(value.asLatencyMarker().getMarkedTime());
+                       target.writeInt(value.asLatencyMarker().getVertexID());
+                       
target.writeInt(value.asLatencyMarker().getSubtaskIndex());
+               }
                else {
                        throw new RuntimeException();
                }
@@ -179,6 +190,9 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                else if (tag == TAG_WATERMARK) {
                        return new Watermark(source.readLong());
                }
+               else if (tag == TAG_LATENCY_MARKER) {
+                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+               }
                else {
                        throw new IOException("Corrupt stream, found tag: " + 
tag);
                }
@@ -203,6 +217,9 @@ public final class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<
                else if (tag == TAG_WATERMARK) {
                        return new Watermark(source.readLong());
                }
+               else if (tag == TAG_LATENCY_MARKER) {
+                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+               }
                else {
                        throw new IOException("Corrupt stream, found tag: " + 
tag);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index f6cccf7..62418bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -44,6 +44,14 @@ public abstract class StreamElement {
        }
 
        /**
+        * Checks whether this element is a record.
+        * @return True, if this element is a record, false otherwise.
+        */
+       public final boolean isLatencyMarker() {
+               return getClass() == LatencyMarker.class;
+       }
+
+       /**
         * Casts this element into a StreamRecord.
         * @return This element as a stream record.
         * @throws java.lang.ClassCastException Thrown, if this element is 
actually not a stream record.
@@ -61,4 +69,13 @@ public abstract class StreamElement {
        public final Watermark asWatermark() {
                return (Watermark) this;
        }
+
+       /**
+        * Casts this element into a LatencyMarker.
+        * @return This element as a LatencyMarker.
+        * @throws java.lang.ClassCastException Thrown, if this element is 
actually not a LatencyMarker.
+        */
+       public final LatencyMarker asLatencyMarker() {
+               return (LatencyMarker) this;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0a6534b..97546b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        this, 
                                        configuration.getCheckpointMode(),
                                        getEnvironment().getIOManager(),
-                                       isSerializingTimestamps());
+                                       isSerializingMixedStream());
 
                        // make sure that stream tasks report their I/O 
statistics
                        AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 9e96f5d..7342b6d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -17,6 +17,13 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,24 +36,22 @@ import 
org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 /**
  * The {@code OperatorChain} contains all operators that are executed as one 
chain within a single
@@ -72,7 +77,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                
                final ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
                final StreamConfig configuration = 
containingTask.getConfiguration();
-               final boolean enableTimestamps = 
containingTask.isSerializingTimestamps();
+               final boolean enableMultiplexing = 
containingTask.isSerializingMixedStream();
 
                headOperator = 
configuration.getStreamOperator(userCodeClassloader);
 
@@ -94,7 +99,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                                
                                RecordWriterOutput<?> streamOutput = 
createStreamOutput(
                                                outEdge, 
chainedConfigs.get(outEdge.getSourceId()), i,
-                                               
containingTask.getEnvironment(), enableTimestamps, reporter, 
containingTask.getName());
+                                               
containingTask.getEnvironment(), enableMultiplexing, reporter, 
containingTask.getName());
        
                                this.streamOutputs[i] = streamOutput;
                                streamOutputMap.put(outEdge, streamOutput);
@@ -300,7 +305,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
        
        private static <T> RecordWriterOutput<T> createStreamOutput(
                        StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
-                       Environment taskEnvironment, boolean withTimestamps,
+                       Environment taskEnvironment, boolean enableMultiplexing,
                        AccumulatorRegistry.Reporter reporter, String taskName)
        {
                TypeSerializer<T> outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
@@ -317,7 +322,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                output.setReporter(reporter);
                
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
                
-               return new RecordWriterOutput<T>(output, outSerializer, 
withTimestamps);
+               return new RecordWriterOutput<>(output, outSerializer, 
enableMultiplexing);
        }
        
        // 
------------------------------------------------------------------------
@@ -357,6 +362,16 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       try {
+                               operator.processLatencyMarker(latencyMarker);
+                       }
+                       catch (Exception e) {
+                               throw new 
ExceptionInChainedOperatorException(e);
+                       }
+               }
+
+               @Override
                public void close() {
                        try {
                                operator.close();
@@ -393,6 +408,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
        private static class BroadcastingOutputCollector<T> implements 
Output<StreamRecord<T>> {
                
                protected final Output<StreamRecord<T>>[] outputs;
+
+               private final Random RNG = new XORShiftRandom();
                
                public BroadcastingOutputCollector(Output<StreamRecord<T>>[] 
outputs) {
                        this.outputs = outputs;
@@ -406,6 +423,18 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       if(outputs.length <= 0) {
+                               // ignore
+                       } else if(outputs.length == 1) {
+                               outputs[0].emitLatencyMarker(latencyMarker);
+                       } else {
+                               // randomly select an output
+                               
outputs[RNG.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
+                       }
+               }
+
+               @Override
                public void collect(StreamRecord<T> record) {
                        for (Output<StreamRecord<T>> output : outputs) {
                                output.collect(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 58e3cb8..a5f94ad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,11 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                public void processWatermark(Watermark mark) {
                        // ignore
                }
+
+               @Override
+               public void processLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
+                       // ignore
+               }
        }
 
        private static class IterationTailOutput<IN> implements 
Output<StreamRecord<IN>> {
@@ -96,6 +102,10 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+               }
+
+               @Override
                public void collect(StreamRecord<IN> record) {
                        try {
                                if (shouldWait) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4893fed..2e6ebf3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -461,6 +461,14 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return tc == TimeCharacteristic.EventTime | tc == 
TimeCharacteristic.IngestionTime;
        }
 
+       /**
+        * Check if the tasks is sending a mixed stream (of watermarks, latency 
marks and records)
+        * @return true if stream contains more than just records
+        */
+       protected boolean isSerializingMixedStream() {
+               return isSerializingTimestamps() || 
getExecutionConfig().isLatencyTrackingEnabled();
+       }
+       
        // 
------------------------------------------------------------------------
        //  Access to properties and utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index fb08959..bc80607 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                this,
                                configuration.getCheckpointMode(),
                                getEnvironment().getIOManager(),
-                               isSerializingTimestamps());
+                               isSerializingMixedStream());
 
                // make sure that stream tasks report their I/O statistics
                AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index c93a439..aa86304 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
@@ -453,6 +454,16 @@ public class StreamGraphGeneratorTest {
                public void processWatermark2(Watermark mark) throws Exception 
{}
 
                @Override
+               public void processLatencyMarker1(LatencyMarker latencyMarker) 
throws Exception {
+                       // ignore
+               }
+
+               @Override
+               public void processLatencyMarker2(LatencyMarker latencyMarker) 
throws Exception {
+                       // ignore
+               }
+
+               @Override
                public void setup(StreamTask<?, ?> containingTask, StreamConfig 
config, Output<StreamRecord<Integer>> output) {}
        }
 
@@ -476,6 +487,11 @@ public class StreamGraphGeneratorTest {
                public void processWatermark(Watermark mark) {}
 
                @Override
+               public void processLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
+
+               }
+
+               @Override
                public void setOutputType(TypeInformation<Integer> outTypeInfo, 
ExecutionConfig executionConfig) {
                        tpeInformation = outTypeInfo;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
deleted file mode 100644
index dc8024c..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamCounter}. These test that:
- *
- * <ul>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamCounterTest {
-
-       @Test
-       public void testCount() throws Exception {
-               StreamCounter<String> operator = new StreamCounter<String>();
-
-               OneInputStreamOperatorTestHarness<String, Long> testHarness = 
new OneInputStreamOperatorTestHarness<String, Long>(operator);
-
-               long initialTime = 0L;
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
-
-               testHarness.open();
-
-               testHarness.processElement(new StreamRecord<String>("eins", 
initialTime + 1));
-               testHarness.processElement(new StreamRecord<String>("zwei", 
initialTime + 2));
-               testHarness.processWatermark(new Watermark(initialTime + 2));
-               testHarness.processElement(new StreamRecord<String>("drei", 
initialTime + 3));
-
-               expectedOutput.add(new StreamRecord<Long>(1L, initialTime + 1));
-               expectedOutput.add(new StreamRecord<Long>(2L, initialTime + 2));
-               expectedOutput.add(new Watermark(initialTime + 2));
-               expectedOutput.add(new StreamRecord<Long>(3L, initialTime + 3));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-       }
-}

Reply via email to