[FLINK-9789][metrics] Ensure uniqueness of watermark metrics

This closes #6292.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60df251a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60df251a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60df251a

Branch: refs/heads/master
Commit: 60df251ad34ac033ed6c4423a69765739cb04199
Parents: 2fbe562
Author: zentol <ches...@apache.org>
Authored: Tue Jul 10 13:27:34 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Jul 11 12:05:07 2018 +0200

----------------------------------------------------------------------
 .../util/InterceptingTaskMetricGroup.java       | 53 ++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTask.java       |  3 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  5 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  3 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 18 ++++++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 22 +++++++-
 6 files changed, 98 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
new file mode 100644
index 0000000..29454b4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+       private Map<String, Metric> intercepted;
+
+       /**
+        * Returns the registered metric for the given name, or null if it was 
never registered.
+        *
+        * @param name metric name
+        * @return registered metric for the given name, or null if it was 
never registered
+        */
+       public Metric get(String name) {
+               return intercepted.get(name);
+       }
+
+       @Override
+       protected void addMetric(String name, Metric metric) {
+               if (intercepted == null) {
+                       intercepted = new HashMap<>();
+               }
+               intercepted.put(name, metric);
+               super.addMetric(name, metric);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 43eab24..7498518 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -93,7 +93,8 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        inputWatermarkGauge);
                }
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
-               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
+               // wrap watermark gauge since registered metrics must be unique
+               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge::getValue);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index c105ad7..015b7db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -381,8 +381,9 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                        currentOperatorOutput = new 
CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
                }
 
-               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
currentOperatorOutput.getWatermarkGauge());
-               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, 
chainedOperatorOutput.getWatermarkGauge());
+               // wrap watermark gauges since registered metrics must be unique
+               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
currentOperatorOutput.getWatermarkGauge()::getValue);
+               
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, 
chainedOperatorOutput.getWatermarkGauge()::getValue);
 
                return currentOperatorOutput;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 93a5675..546ccdb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -105,7 +105,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, 
input2WatermarkGauge);
-               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
+               // wrap watermark gauge since registered metrics must be unique
+               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge::getValue);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index af776d5..201e138 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -64,6 +65,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -678,7 +680,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                InterceptingOperatorMetricGroup headOperatorMetricGroup = new 
InterceptingOperatorMetricGroup();
                InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
-               TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+               InterceptingTaskMetricGroup taskMetricGroup = new 
InterceptingTaskMetricGroup() {
                        @Override
                        public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
                                if (id.equals(headOperatorId)) {
@@ -702,11 +704,23 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.invoke(env);
                testHarness.waitForTaskRunning();
 
+               Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) 
taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
                Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
                Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
                Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
                Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+               Assert.assertEquals("A metric was registered multiple times.",
+                       5,
+                       new HashSet<>(Arrays.asList(
+                               taskInputWatermarkGauge,
+                               headInputWatermarkGauge,
+                               headOutputWatermarkGauge,
+                               chainedInputWatermarkGauge,
+                               chainedOutputWatermarkGauge))
+                               .size());
+
+               Assert.assertEquals(Long.MIN_VALUE, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headOutputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
chainedInputWatermarkGauge.getValue().longValue());
@@ -714,6 +728,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                testHarness.processElement(new Watermark(1L));
                testHarness.waitForInputProcessing();
+               Assert.assertEquals(1L, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(1L, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
headOutputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
chainedInputWatermarkGauge.getValue().longValue());
@@ -721,6 +736,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                testHarness.processElement(new Watermark(2L));
                testHarness.waitForInputProcessing();
+               Assert.assertEquals(2L, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(4L, 
headOutputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(4L, 
chainedInputWatermarkGauge.getValue().longValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 38b262c..5d15157 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -49,6 +50,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -465,7 +468,7 @@ public class TwoInputStreamTaskTest {
 
                InterceptingOperatorMetricGroup headOperatorMetricGroup = new 
InterceptingOperatorMetricGroup();
                InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
-               TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+               InterceptingTaskMetricGroup taskMetricGroup = new 
InterceptingTaskMetricGroup() {
                        @Override
                        public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
                                if (id.equals(headOperatorId)) {
@@ -489,6 +492,7 @@ public class TwoInputStreamTaskTest {
                testHarness.invoke(env);
                testHarness.waitForTaskRunning();
 
+               Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) 
taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
                Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
                Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
                Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) 
headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
@@ -496,6 +500,19 @@ public class TwoInputStreamTaskTest {
                Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
                Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) 
chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+               Assert.assertEquals("A metric was registered multiple times.",
+                       7,
+                       new HashSet<>(Arrays.asList(
+                               taskInputWatermarkGauge,
+                               headInput1WatermarkGauge,
+                               headInput2WatermarkGauge,
+                               headInputWatermarkGauge,
+                               headOutputWatermarkGauge,
+                               chainedInputWatermarkGauge,
+                               chainedOutputWatermarkGauge))
+                               .size());
+
+               Assert.assertEquals(Long.MIN_VALUE, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInput1WatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInput2WatermarkGauge.getValue().longValue());
@@ -505,6 +522,7 @@ public class TwoInputStreamTaskTest {
 
                testHarness.processElement(new Watermark(1L), 0, 0);
                testHarness.waitForInputProcessing();
+               Assert.assertEquals(Long.MIN_VALUE, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(1L, 
headInput1WatermarkGauge.getValue().longValue());
                Assert.assertEquals(Long.MIN_VALUE, 
headInput2WatermarkGauge.getValue().longValue());
@@ -514,6 +532,7 @@ public class TwoInputStreamTaskTest {
 
                testHarness.processElement(new Watermark(2L), 1, 0);
                testHarness.waitForInputProcessing();
+               Assert.assertEquals(1L, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(1L, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(1L, 
headInput1WatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
headInput2WatermarkGauge.getValue().longValue());
@@ -523,6 +542,7 @@ public class TwoInputStreamTaskTest {
 
                testHarness.processElement(new Watermark(3L), 0, 0);
                testHarness.waitForInputProcessing();
+               Assert.assertEquals(2L, 
taskInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
headInputWatermarkGauge.getValue().longValue());
                Assert.assertEquals(3L, 
headInput1WatermarkGauge.getValue().longValue());
                Assert.assertEquals(2L, 
headInput2WatermarkGauge.getValue().longValue());

Reply via email to