Repository: flink
Updated Branches:
  refs/heads/master b5a7b3578 -> 92ad53e6a


[FLINK-4812][metrics] Expose currentLowWatermark for all operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92ad53e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92ad53e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92ad53e6

Branch: refs/heads/master
Commit: 92ad53e6a7e4c9df7055aac89b49ae95d50c5b4c
Parents: b5a7b35
Author: zentol <ches...@apache.org>
Authored: Tue Dec 5 14:20:29 2017 +0100
Committer: zentol <ches...@apache.org>
Committed: Fri Feb 2 16:12:07 2018 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 40 +++++++--
 .../flink/runtime/metrics/MetricNames.java      |  5 ++
 .../util/InterceptingOperatorMetricGroup.java   | 53 +++++++++++
 .../selector/CopyingDirectedOutput.java         |  2 +-
 .../api/collector/selector/DirectedOutput.java  | 17 +++-
 .../runtime/io/RecordWriterOutput.java          | 13 ++-
 .../runtime/io/StreamInputProcessor.java        | 36 ++------
 .../runtime/io/StreamTwoInputProcessor.java     | 42 +++------
 .../runtime/metrics/MinWatermarkGauge.java      | 40 +++++++++
 .../runtime/metrics/WatermarkGauge.java         | 38 ++++++++
 .../runtime/tasks/OneInputStreamTask.java       | 12 ++-
 .../streaming/runtime/tasks/OperatorChain.java  | 63 ++++++++++---
 .../runtime/tasks/TwoInputStreamTask.java       | 22 ++++-
 .../runtime/metrics/MinWatermarkGaugeTest.java  | 46 ++++++++++
 .../runtime/metrics/WatermarkGaugeTest.java     | 38 ++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   | 89 ++++++++++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 94 ++++++++++++++++++++
 17 files changed, 559 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 00c01b7..62adeb1 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1190,12 +1190,7 @@ Thus, in order to infer the metric identifier:
   </thead>
   <tbody>
     <tr>
-      <th rowspan="7"><strong>Task</strong></th>
-      <td>currentLowWatermark</td>
-      <td>The lowest watermark this task has received (in milliseconds).</td>
-      <td>Gauge</td>
-    </tr>
-    <tr>
+      <th rowspan="6"><strong>Task</strong></th>
       <td>numBytesInLocal</td>
       <td>The total number of bytes this task has read from a local 
source.</td>
       <td>Counter</td>
@@ -1252,7 +1247,38 @@ Thus, in order to infer the metric identifier:
       <td>Counter</td>
     </tr>
     <tr>
-      <th rowspan="2"><strong>Operator</strong></th>
+      <th rowspan="6"><strong>Operator</strong></th>
+      <td>currentInputWatermark</td>
+      <td>
+        The last watermark this operator has received (in milliseconds).
+        <p><strong>Note:</strong> For operators with 2 inputs this is the 
minimum of the last received watermarks.</p>
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>currentInput1Watermark</td>
+      <td>
+        The last watermark this operator has received in its first input (in 
milliseconds).
+        <p><strong>Note:</strong> Only for operators with 2 inputs.</p>
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>currentInput2Watermark</td>
+      <td>
+        The last watermark this operator has received in its second input (in 
milliseconds).
+        <p><strong>Note:</strong> Only for operators with 2 inputs.</p>
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>currentOutputWatermark</td>
+      <td>
+        The last watermark this operator has emitted (in milliseconds).
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td>latency</td>
       <td>The latency distributions from all incoming sources (in 
milliseconds).</td>
       <td>Histogram</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 300b4b0..d15a0f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -39,4 +39,9 @@ public class MetricNames {
        public static final String IO_NUM_BYTES_IN_LOCAL_RATE = 
IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE;
        public static final String IO_NUM_BYTES_IN_REMOTE_RATE = 
IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
        public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + 
SUFFIX_RATE;
+
+       public static final String IO_CURRENT_INPUT_WATERMARK = 
"currentInputWatermark";
+       public static final String IO_CURRENT_INPUT_1_WATERMARK = 
"currentInput1Watermark";
+       public static final String IO_CURRENT_INPUT_2_WATERMARK = 
"currentInput2Watermark";
+       public static final String IO_CURRENT_OUTPUT_WATERMARK = 
"currentOutputWatermark";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java
new file mode 100644
index 0000000..f4d0581
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An {@link OperatorMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingOperatorMetricGroup extends 
UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
+
+       private Map<String, Metric> intercepted;
+
+       /**
+        * Returns the registered metric for the given name, or null if it was 
never registered.
+        *
+        * @param name metric name
+        * @return registered metric for the given name, or null if it was 
never registered
+        */
+       public Metric get(String name) {
+               return intercepted.get(name);
+       }
+
+       @Override
+       protected void addMetric(String name, Metric metric) {
+               if (intercepted == null) {
+                       intercepted = new HashMap<>();
+               }
+               intercepted.put(name, metric);
+               super.addMetric(name, metric);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
index 5f7e278..3533cd1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
@@ -36,7 +36,7 @@ public class CopyingDirectedOutput<OUT> extends 
DirectedOutput<OUT> {
        @SuppressWarnings({"unchecked", "rawtypes"})
        public CopyingDirectedOutput(
                        List<OutputSelector<OUT>> outputSelectors,
-                       List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> 
outputs) {
+                       List<? extends Tuple2<? extends 
Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
                super(outputSelectors, outputs);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index ec4700f..6512174 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -18,11 +18,14 @@
 package org.apache.flink.streaming.api.collector.selector;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
@@ -39,7 +42,7 @@ import java.util.Set;
  * Wrapping {@link Output} that forwards to other {@link Output Outputs } 
based on a list of
  * {@link OutputSelector OutputSelectors}.
  */
-public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
+public class DirectedOutput<OUT> implements 
OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
 
        protected final OutputSelector<OUT>[] outputSelectors;
 
@@ -51,10 +54,12 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
 
        private final Random random = new XORShiftRandom();
 
+       protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+
        @SuppressWarnings({"unchecked", "rawtypes"})
        public DirectedOutput(
                        List<OutputSelector<OUT>> outputSelectors,
-                       List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> 
outputs) {
+                       List<? extends Tuple2<? extends 
Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
                this.outputSelectors = outputSelectors.toArray(new 
OutputSelector[outputSelectors.size()]);
 
                this.allOutputs = new Output[outputs.size()];
@@ -65,7 +70,7 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new 
HashSet<Output<StreamRecord<OUT>>>();
                HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap 
= new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
 
-               for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : 
outputs) {
+               for (Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge> 
outputPair : outputs) {
                        final Output<StreamRecord<OUT>> output = outputPair.f0;
                        final StreamEdge edge = outputPair.f1;
 
@@ -100,6 +105,7 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
 
        @Override
        public void emitWatermark(Watermark mark) {
+               watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                for (Output<StreamRecord<OUT>> out : allOutputs) {
                        out.emitWatermark(mark);
                }
@@ -149,4 +155,9 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                        out.close();
                }
        }
+
+       @Override
+       public Gauge<Long> getWatermarkGauge() {
+               return watermarkGauge;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 3b70be7..45bbd66 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -19,17 +19,20 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.util.OutputTag;
 
 import java.io.IOException;
@@ -40,7 +43,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Implementation of {@link Output} that sends data using a {@link 
RecordWriter}.
  */
 @Internal
-public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
+public class RecordWriterOutput<OUT> implements 
OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
 
        private StreamRecordWriter<SerializationDelegate<StreamElement>> 
recordWriter;
 
@@ -50,6 +53,8 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
 
        private final OutputTag outputTag;
 
+       private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+
        @SuppressWarnings("unchecked")
        public RecordWriterOutput(
                        
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
@@ -108,6 +113,7 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
 
        @Override
        public void emitWatermark(Watermark mark) {
+               watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                serializationDelegate.setInstance(mark);
 
                if (streamStatusProvider.getStreamStatus().isActive()) {
@@ -158,4 +164,9 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        public void clearBuffers() {
                recordWriter.clearBuffers();
        }
+
+       @Override
+       public Gauge<Long> getWatermarkGauge() {
+               return watermarkGauge;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 9f526a5..dc3dc5c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -42,6 +41,7 @@ import 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -106,7 +106,7 @@ public class StreamInputProcessor<IN> {
 
        // ---------------- Metrics ------------------
 
-       private long lastEmittedWatermark;
+       private final WatermarkGauge watermarkGauge;
        private Counter numRecordsIn;
 
        private boolean isFinished;
@@ -121,7 +121,9 @@ public class StreamInputProcessor<IN> {
                        IOManager ioManager,
                        Configuration taskManagerConfig,
                        StreamStatusMaintainer streamStatusMaintainer,
-                       OneInputStreamOperator<IN, ?> streamOperator) throws 
IOException {
+                       OneInputStreamOperator<IN, ?> streamOperator,
+                       TaskIOMetricGroup metrics,
+                       WatermarkGauge watermarkGauge) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -160,14 +162,15 @@ public class StreamInputProcessor<IN> {
 
                this.numInputChannels = inputGate.getNumberOfInputChannels();
 
-               this.lastEmittedWatermark = Long.MIN_VALUE;
-
                this.streamStatusMaintainer = 
checkNotNull(streamStatusMaintainer);
                this.streamOperator = checkNotNull(streamOperator);
 
                this.statusWatermarkValve = new StatusWatermarkValve(
                                numInputChannels,
                                new 
ForwardingValveOutputHandler(streamOperator, lock));
+
+               this.watermarkGauge = watermarkGauge;
+               metrics.gauge("checkpointAlignmentTime", 
barrierHandler::getAlignmentDurationNanos);
        }
 
        public boolean processInput() throws Exception {
@@ -247,27 +250,6 @@ public class StreamInputProcessor<IN> {
                }
        }
 
-       /**
-        * Sets the metric group for this StreamInputProcessor.
-        *
-        * @param metrics metric group
-        */
-       public void setMetricGroup(TaskIOMetricGroup metrics) {
-               metrics.gauge("currentLowWatermark", new Gauge<Long>() {
-                       @Override
-                       public Long getValue() {
-                               return lastEmittedWatermark;
-                       }
-               });
-
-               metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
-                       @Override
-                       public Long getValue() {
-                               return 
barrierHandler.getAlignmentDurationNanos();
-                       }
-               });
-       }
-
        public void cleanup() throws IOException {
                // clear the buffers first. this part should not ever fail
                for (RecordDeserializer<?> deserializer : recordDeserializers) {
@@ -295,7 +277,7 @@ public class StreamInputProcessor<IN> {
                public void handleWatermark(Watermark watermark) {
                        try {
                                synchronized (lock) {
-                                       lastEmittedWatermark = 
watermark.getTimestamp();
+                                       
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                                        operator.processWatermark(watermark);
                                }
                        } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3d9236..494a82a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -42,6 +41,7 @@ import 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -120,8 +120,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
        // ---------------- Metrics ------------------
 
-       private long lastEmittedWatermark1;
-       private long lastEmittedWatermark2;
+       private final WatermarkGauge input1WatermarkGauge;
+       private final WatermarkGauge input2WatermarkGauge;
 
        private Counter numRecordsIn;
 
@@ -139,7 +139,10 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        IOManager ioManager,
                        Configuration taskManagerConfig,
                        StreamStatusMaintainer streamStatusMaintainer,
-                       TwoInputStreamOperator<IN1, IN2, ?> streamOperator) 
throws IOException {
+                       TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
+                       TaskIOMetricGroup metrics,
+                       WatermarkGauge input1WatermarkGauge,
+                       WatermarkGauge input2WatermarkGauge) throws IOException 
{
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -188,9 +191,6 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                this.numInputChannels1 = numInputChannels1;
                this.numInputChannels2 = inputGate.getNumberOfInputChannels() - 
numInputChannels1;
 
-               this.lastEmittedWatermark1 = Long.MIN_VALUE;
-               this.lastEmittedWatermark2 = Long.MIN_VALUE;
-
                this.firstStatus = StreamStatus.ACTIVE;
                this.secondStatus = StreamStatus.ACTIVE;
 
@@ -200,6 +200,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                this.statusWatermarkValve1 = new 
StatusWatermarkValve(numInputChannels1, new 
ForwardingValveOutputHandler1(streamOperator, lock));
                this.statusWatermarkValve2 = new 
StatusWatermarkValve(numInputChannels2, new 
ForwardingValveOutputHandler2(streamOperator, lock));
 
+               this.input1WatermarkGauge = input1WatermarkGauge;
+               this.input2WatermarkGauge = input2WatermarkGauge;
+               metrics.gauge("checkpointAlignmentTime", 
barrierHandler::getAlignmentDurationNanos);
        }
 
        public boolean processInput() throws Exception {
@@ -312,27 +315,6 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                }
        }
 
-       /**
-        * Sets the metric group for this StreamTwoInputProcessor.
-        *
-        * @param metrics metric group
-        */
-       public void setMetricGroup(TaskIOMetricGroup metrics) {
-               metrics.gauge("currentLowWatermark", new Gauge<Long>() {
-                       @Override
-                       public Long getValue() {
-                               return Math.min(lastEmittedWatermark1, 
lastEmittedWatermark2);
-                       }
-               });
-
-               metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
-                       @Override
-                       public Long getValue() {
-                               return 
barrierHandler.getAlignmentDurationNanos();
-                       }
-               });
-       }
-
        public void cleanup() throws IOException {
                // clear the buffers first. this part should not ever fail
                for (RecordDeserializer<?> deserializer : recordDeserializers) {
@@ -360,7 +342,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                public void handleWatermark(Watermark watermark) {
                        try {
                                synchronized (lock) {
-                                       lastEmittedWatermark1 = 
watermark.getTimestamp();
+                                       
input1WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                                        operator.processWatermark1(watermark);
                                }
                        } catch (Exception e) {
@@ -404,7 +386,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                public void handleWatermark(Watermark watermark) {
                        try {
                                synchronized (lock) {
-                                       lastEmittedWatermark2 = 
watermark.getTimestamp();
+                                       
input2WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                                        operator.processWatermark2(watermark);
                                }
                        } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java
new file mode 100644
index 0000000..6736dca
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * A {@link Gauge} for exposing the minimum watermark of a {@link 
WatermarkGauge} pair.
+ */
+public class MinWatermarkGauge implements Gauge<Long> {
+
+       private WatermarkGauge watermarkGauge1;
+       private WatermarkGauge watermarkGauge2;
+
+       public MinWatermarkGauge(WatermarkGauge watermarkGauge1, WatermarkGauge 
watermarkGauge2) {
+               this.watermarkGauge1 = watermarkGauge1;
+               this.watermarkGauge2 = watermarkGauge2;
+       }
+
+       @Override
+       public Long getValue() {
+               return Math.min(watermarkGauge1.getValue(), 
watermarkGauge2.getValue());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java
new file mode 100644
index 0000000..42c7000
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * A {@link Gauge} for exposing the current input/output watermark.
+ */
+public class WatermarkGauge implements Gauge<Long> {
+
+       private long currentWatermark = Long.MIN_VALUE;
+
+       public void setCurrentWatermark(long watermark) {
+               this.currentWatermark = watermark;
+       }
+
+       @Override
+       public Long getValue() {
+               return currentWatermark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index bc8aaae..26088e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -23,9 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import javax.annotation.Nullable;
 
@@ -39,6 +41,8 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
 
        private volatile boolean running = true;
 
+       private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
+
        /**
         * Constructor for initialization, possibly with initial state 
(recovery / savepoint / etc).
         *
@@ -84,11 +88,11 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        getEnvironment().getIOManager(),
                                        
getEnvironment().getTaskManagerInfo().getConfiguration(),
                                        getStreamStatusMaintainer(),
-                                       this.headOperator);
-
-                       // make sure that stream tasks report their I/O 
statistics
-                       
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+                                       this.headOperator,
+                                       
getEnvironment().getMetricGroup().getIOMetricGroup(),
+                                       inputWatermarkGauge);
                }
+               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 141a623..fdeea17 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,12 +21,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -41,6 +43,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
@@ -77,7 +80,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
 
        private final RecordWriterOutput<?>[] streamOutputs;
 
-       private final Output<StreamRecord<OUT>> chainEntryPoint;
+       private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainEntryPoint;
 
        private final OP headOperator;
 
@@ -134,8 +137,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                allOps);
 
                        if (headOperator != null) {
-                               Output output = getChainEntryPoint();
+                               WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
output = getChainEntryPoint();
                                headOperator.setup(containingTask, 
configuration, output);
+
+                               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, 
output.getWatermarkGauge());
                        }
 
                        // add head operator to end of chain
@@ -209,7 +214,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                return allOperators;
        }
 
-       public Output<StreamRecord<OUT>> getChainEntryPoint() {
+       public WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
getChainEntryPoint() {
                return chainEntryPoint;
        }
 
@@ -259,21 +264,21 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
        //  initialization utilities
        // 
------------------------------------------------------------------------
 
-       private <T> Output<StreamRecord<T>> createOutputCollector(
+       private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> 
createOutputCollector(
                        StreamTask<?, ?> containingTask,
                        StreamConfig operatorConfig,
                        Map<Integer, StreamConfig> chainedConfigs,
                        ClassLoader userCodeClassloader,
                        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
                        List<StreamOperator<?>> allOperators) {
-               List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = 
new ArrayList<>(4);
+               List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, 
StreamEdge>> allOutputs = new ArrayList<>(4);
 
                // create collectors for the network outputs
                for (StreamEdge outputEdge : 
operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
                        @SuppressWarnings("unchecked")
                        RecordWriterOutput<T> output = (RecordWriterOutput<T>) 
streamOutputs.get(outputEdge);
 
-                       allOutputs.add(new Tuple2<Output<StreamRecord<T>>, 
StreamEdge>(output, outputEdge));
+                       allOutputs.add(new Tuple2<>(output, outputEdge));
                }
 
                // Create collectors for the chained outputs
@@ -281,7 +286,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                        int outputId = outputEdge.getTargetId();
                        StreamConfig chainedOpConfig = 
chainedConfigs.get(outputId);
 
-                       Output<StreamRecord<T>> output = createChainedOperator(
+                       WatermarkGaugeExposingOutput<StreamRecord<T>> output = 
createChainedOperator(
                                containingTask,
                                chainedOpConfig,
                                chainedConfigs,
@@ -336,7 +341,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
        }
 
-       private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
+       private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> 
createChainedOperator(
                        StreamTask<?, ?> containingTask,
                        StreamConfig operatorConfig,
                        Map<Integer, StreamConfig> chainedConfigs,
@@ -345,7 +350,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                        List<StreamOperator<?>> allOperators,
                        OutputTag<IN> outputTag) {
                // create the output that the operator writes to first. this 
may recursively create more operators
-               Output<StreamRecord<OUT>> output = createOutputCollector(
+               WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainedOperatorOutput = createOutputCollector(
                        containingTask,
                        operatorConfig,
                        chainedConfigs,
@@ -356,17 +361,23 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                // now create the operator and give it the output collector to 
write its output to
                OneInputStreamOperator<IN, OUT> chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
 
-               chainedOperator.setup(containingTask, operatorConfig, output);
+               chainedOperator.setup(containingTask, operatorConfig, 
chainedOperatorOutput);
 
                allOperators.add(chainedOperator);
 
+               WatermarkGaugeExposingOutput<StreamRecord<IN>> 
currentOperatorOutput;
                if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
-                       return new ChainingOutput<>(chainedOperator, this, 
outputTag);
+                       currentOperatorOutput = new 
ChainingOutput<>(chainedOperator, this, outputTag);
                }
                else {
                        TypeSerializer<IN> inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-                       return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
+                       currentOperatorOutput = new 
CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
                }
+
+               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
currentOperatorOutput.getWatermarkGauge());
+               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, 
chainedOperatorOutput.getWatermarkGauge());
+
+               return currentOperatorOutput;
        }
 
        private <T> RecordWriterOutput<T> createStreamOutput(
@@ -414,10 +425,20 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
        //  Collectors for output chaining
        // 
------------------------------------------------------------------------
 
-       private static class ChainingOutput<T> implements 
Output<StreamRecord<T>> {
+       /**
+        * An {@link Output} that measures the last emitted watermark with a 
{@link WatermarkGauge}.
+        *
+        * @param <T> The type of the elements that can be emitted.
+        */
+       public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
+               Gauge<Long> getWatermarkGauge();
+       }
+
+       private static class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
                protected final OneInputStreamOperator<T, ?> operator;
                protected final Counter numRecordsIn;
+               protected final WatermarkGauge watermarkGauge = new 
WatermarkGauge();
 
                protected final StreamStatusProvider streamStatusProvider;
 
@@ -487,6 +508,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                @Override
                public void emitWatermark(Watermark mark) {
                        try {
+                               
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                                if 
(streamStatusProvider.getStreamStatus().isActive()) {
                                        operator.processWatermark(mark);
                                }
@@ -515,6 +537,11 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                throw new 
ExceptionInChainedOperatorException(e);
                        }
                }
+
+               @Override
+               public Gauge<Long> getWatermarkGauge() {
+                       return watermarkGauge;
+               }
        }
 
        private static final class CopyingChainingOutput<T> extends 
ChainingOutput<T> {
@@ -582,7 +609,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
        }
 
-       private static class BroadcastingOutputCollector<T> implements 
Output<StreamRecord<T>> {
+       private static class BroadcastingOutputCollector<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
                protected final Output<StreamRecord<T>>[] outputs;
 
@@ -590,6 +617,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
 
                private final StreamStatusProvider streamStatusProvider;
 
+               private final WatermarkGauge watermarkGauge = new 
WatermarkGauge();
+
                public BroadcastingOutputCollector(
                                Output<StreamRecord<T>>[] outputs,
                                StreamStatusProvider streamStatusProvider) {
@@ -599,6 +628,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
 
                @Override
                public void emitWatermark(Watermark mark) {
+                       watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                        if (streamStatusProvider.getStreamStatus().isActive()) {
                                for (Output<StreamRecord<T>> output : outputs) {
                                        output.emitWatermark(mark);
@@ -619,6 +649,11 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
 
                @Override
+               public Gauge<Long> getWatermarkGauge() {
+                       return watermarkGauge;
+               }
+
+               @Override
                public void collect(StreamRecord<T> record) {
                        for (Output<StreamRecord<T>> output : outputs) {
                                output.collect(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c1ba7b2..bd878f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -21,10 +21,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
+import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +42,10 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
 
        private volatile boolean running = true;
 
+       private final WatermarkGauge input1WatermarkGauge;
+       private final WatermarkGauge input2WatermarkGauge;
+       private final MinWatermarkGauge minInputWatermarkGauge;
+
        /**
         * Constructor for initialization, possibly with initial state 
(recovery / savepoint / etc).
         *
@@ -46,6 +53,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
         */
        public TwoInputStreamTask(Environment env) {
                super(env);
+               input1WatermarkGauge = new WatermarkGauge();
+               input2WatermarkGauge = new WatermarkGauge();
+               minInputWatermarkGauge = new 
MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge);
        }
 
        @Override
@@ -87,10 +97,14 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                getEnvironment().getIOManager(),
                                
getEnvironment().getTaskManagerInfo().getConfiguration(),
                                getStreamStatusMaintainer(),
-                               this.headOperator);
-
-               // make sure that stream tasks report their I/O statistics
-               
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+                               this.headOperator,
+                               
getEnvironment().getMetricGroup().getIOMetricGroup(),
+                               input1WatermarkGauge,
+                               input2WatermarkGauge);
+
+               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
+               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
+               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, 
input2WatermarkGauge);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java
new file mode 100644
index 0000000..ae673a8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link MinWatermarkGauge}.
+ */
+public class MinWatermarkGaugeTest {
+
+       @Test
+       public void testSetCurrentLowWatermark() {
+               WatermarkGauge metric1 = new WatermarkGauge();
+               WatermarkGauge metric2 = new WatermarkGauge();
+               MinWatermarkGauge metric = new MinWatermarkGauge(metric1, 
metric2);
+
+               Assert.assertEquals(Long.MIN_VALUE, 
metric.getValue().longValue());
+
+               metric1.setCurrentWatermark(1);
+               Assert.assertEquals(Long.MIN_VALUE, 
metric.getValue().longValue());
+
+               metric2.setCurrentWatermark(2);
+               Assert.assertEquals(1L, metric.getValue().longValue());
+
+               metric1.setCurrentWatermark(3);
+               Assert.assertEquals(2L, metric.getValue().longValue());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java
new file mode 100644
index 0000000..c8cec88
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link WatermarkGauge}.
+ */
+public class WatermarkGaugeTest {
+
+       @Test
+       public void testSetCurrentLowWatermark() {
+               WatermarkGauge metric = new WatermarkGauge();
+
+               Assert.assertEquals(Long.MIN_VALUE, 
metric.getValue().longValue());
+
+               metric.setCurrentWatermark(64);
+               Assert.assertEquals(64, metric.getValue().longValue());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 9a7b699..3e0459d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ListState;
@@ -28,12 +29,19 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.TestTaskStateManager;
@@ -603,6 +611,87 @@ public class OneInputStreamTaskTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testWatermarkMetrics() throws Exception {
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+               OneInputStreamOperator<String, String> headOperator = new 
WatermarkMetricOperator();
+               OperatorID headOperatorId = new OperatorID();
+
+               OneInputStreamOperator<String, String> chainedOperator = new 
WatermarkMetricOperator();
+               OperatorID chainedOperatorId = new OperatorID();
+
+               testHarness.setupOperatorChain(headOperatorId, headOperator)
+                       .chain(chainedOperatorId, chainedOperator, 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                       .finish();
+
+               InterceptingOperatorMetricGroup headOperatorMetricGroup = new 
InterceptingOperatorMetricGroup();
+               InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
+               TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+                       @Override
+                       public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
+                               if (id.equals(headOperatorId)) {
+                                       return headOperatorMetricGroup;
+                               } else if (id.equals(chainedOperatorId)) {
+                                       return chainedOperatorMetricGroup;
+                               } else {
+                                       return super.addOperator(id, name);
+                               }
+                       }
+               };
+
+               StreamMockEnvironment env = new StreamMockEnvironment(
+                       testHarness.jobConfig, testHarness.taskConfig, 
testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, 
new TestTaskStateManager()) {
+                       @Override
+                       public TaskMetricGroup getMetricGroup() {
+                               return taskMetricGroup;
+                       }
+               };
+
+               testHarness.invoke(env);
+               testHarness.waitForTaskRunning();
+
+               Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
+               Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+               Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
+               Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+
+               Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.processElement(new Watermark(1L));
+               testHarness.waitForInputProcessing();
+               Assert.assertEquals(1L, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(4L, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.processElement(new Watermark(2L));
+               testHarness.waitForInputProcessing();
+               Assert.assertEquals(2L, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(4L, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(4L, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(8L, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       static class WatermarkMetricOperator extends 
AbstractStreamOperator<String> implements OneInputStreamOperator<String, 
String> {
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       output.emitWatermark(new Watermark(mark.getTimestamp() 
* 2));
+               }
+       }
+
        
//==============================================================================================
        // Utility functions and classes
        
//==============================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index a267c5a..58e28b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -18,12 +18,21 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -373,6 +382,91 @@ public class TwoInputStreamTaskTest {
                                testHarness.getOutput());
        }
 
+       @Test
+       public void testWatermarkMetrics() throws Exception {
+               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               CoStreamMap<String, Integer, String> headOperator = new 
CoStreamMap<>(new IdentityMap());
+               final OperatorID headOperatorId = new OperatorID();
+
+               OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator 
= new OneInputStreamTaskTest.WatermarkMetricOperator();
+               OperatorID chainedOperatorId = new OperatorID();
+
+               testHarness.setupOperatorChain(headOperatorId, headOperator)
+                       .chain(chainedOperatorId, chainedOperator, 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                       .finish();
+
+               InterceptingOperatorMetricGroup headOperatorMetricGroup = new 
InterceptingOperatorMetricGroup();
+               InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
+               TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+                       @Override
+                       public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
+                               if (id.equals(headOperatorId)) {
+                                       return headOperatorMetricGroup;
+                               } else if (id.equals(chainedOperatorId)) {
+                                       return chainedOperatorMetricGroup;
+                               } else {
+                                       return super.addOperator(id, name);
+                               }
+                       }
+               };
+
+               StreamMockEnvironment env = new StreamMockEnvironment(
+                       testHarness.jobConfig, testHarness.taskConfig, 
testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, 
new TestTaskStateManager()) {
+                       @Override
+                       public TaskMetricGroup getMetricGroup() {
+                               return taskMetricGroup;
+                       }
+               };
+
+               testHarness.invoke(env);
+               testHarness.waitForTaskRunning();
+
+               Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
+               Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
+               Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
+               Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+               Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
+               Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+
+               Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headInput1WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headInput2WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.processElement(new Watermark(1L), 0, 0);
+               testHarness.waitForInputProcessing();
+               Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(1L, 
headInput1WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headInput2WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(Long.MIN_VALUE, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.processElement(new Watermark(2L), 1, 0);
+               testHarness.waitForInputProcessing();
+               Assert.assertEquals(1L, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(1L, 
headInput1WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
headInput2WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(1L, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(1L, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.processElement(new Watermark(3L), 0, 0);
+               testHarness.waitForInputProcessing();
+               Assert.assertEquals(2L, 
headInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(3L, 
headInput1WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
headInput2WatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
headOutputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(2L, 
chainedInputWatermarkGauge.getValue().longValue());
+               Assert.assertEquals(4L, 
chainedOutputWatermarkGauge.getValue().longValue());
+
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
        // This must only be used in one test, otherwise the static fields will 
be changed
        // by several tests concurrently
        private static class TestOpenCloseMapFunction extends 
RichCoMapFunction<String, Integer, String> {

Reply via email to