[FLINK-9789][metrics] Ensure uniqueness of watermark metrics This closes #6292.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60df251a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60df251a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60df251a Branch: refs/heads/master Commit: 60df251ad34ac033ed6c4423a69765739cb04199 Parents: 2fbe562 Author: zentol <ches...@apache.org> Authored: Tue Jul 10 13:27:34 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Jul 11 12:05:07 2018 +0200 ---------------------------------------------------------------------- .../util/InterceptingTaskMetricGroup.java | 53 ++++++++++++++++++++ .../runtime/tasks/OneInputStreamTask.java | 3 +- .../streaming/runtime/tasks/OperatorChain.java | 5 +- .../runtime/tasks/TwoInputStreamTask.java | 3 +- .../runtime/tasks/OneInputStreamTaskTest.java | 18 ++++++- .../runtime/tasks/TwoInputStreamTaskTest.java | 22 +++++++- 6 files changed, 98 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java new file mode 100644 index 0000000..29454b4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link TaskMetricGroup} that exposes all registered metrics. + */ +public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup { + + private Map<String, Metric> intercepted; + + /** + * Returns the registered metric for the given name, or null if it was never registered. + * + * @param name metric name + * @return registered metric for the given name, or null if it was never registered + */ + public Metric get(String name) { + return intercepted.get(name); + } + + @Override + protected void addMetric(String name, Metric metric) { + if (intercepted == null) { + intercepted = new HashMap<>(); + } + intercepted.put(name, metric); + super.addMetric(name, metric); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 43eab24..7498518 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -93,7 +93,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO inputWatermarkGauge); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); - getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); + // wrap watermark gauge since registered metrics must be unique + getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index c105ad7..015b7db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -381,8 +381,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } - chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()); - chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()); + // wrap watermark gauges since registered metrics must be unique + chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue); + chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue); return currentOperatorOutput; } http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 93a5675..546ccdb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -105,7 +105,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge); - getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); + // wrap watermark gauge since registered metrics must be unique + getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index af776d5..201e138 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -64,6 +65,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -678,7 +680,7 @@ public class OneInputStreamTaskTest extends TestLogger { InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup(); InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); - TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { + InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { @Override public OperatorMetricGroup addOperator(OperatorID id, String name) { if (id.equals(headOperatorId)) { @@ -702,11 +704,23 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.invoke(env); testHarness.waitForTaskRunning(); + Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + Assert.assertEquals("A metric was registered multiple times.", + 5, + new HashSet<>(Arrays.asList( + taskInputWatermarkGauge, + headInputWatermarkGauge, + headOutputWatermarkGauge, + chainedInputWatermarkGauge, + chainedOutputWatermarkGauge)) + .size()); + + Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); @@ -714,6 +728,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.processElement(new Watermark(1L)); testHarness.waitForInputProcessing(); + Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue()); @@ -721,6 +736,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.processElement(new Watermark(2L)); testHarness.waitForInputProcessing(); + Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue()); Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue()); http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 38b262c..5d15157 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -49,6 +50,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -465,7 +468,7 @@ public class TwoInputStreamTaskTest { InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup(); InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); - TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { + InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { @Override public OperatorMetricGroup addOperator(OperatorID id, String name) { if (id.equals(headOperatorId)) { @@ -489,6 +492,7 @@ public class TwoInputStreamTaskTest { testHarness.invoke(env); testHarness.waitForTaskRunning(); + Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK); Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK); Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); @@ -496,6 +500,19 @@ public class TwoInputStreamTaskTest { Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + Assert.assertEquals("A metric was registered multiple times.", + 7, + new HashSet<>(Arrays.asList( + taskInputWatermarkGauge, + headInput1WatermarkGauge, + headInput2WatermarkGauge, + headInputWatermarkGauge, + headOutputWatermarkGauge, + chainedInputWatermarkGauge, + chainedOutputWatermarkGauge)) + .size()); + + Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); @@ -505,6 +522,7 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new Watermark(1L), 0, 0); testHarness.waitForInputProcessing(); + Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); @@ -514,6 +532,7 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new Watermark(2L), 1, 0); testHarness.waitForInputProcessing(); + Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue()); @@ -523,6 +542,7 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new Watermark(3L), 0, 0); testHarness.waitForInputProcessing(); + Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue()); Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue()); Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());