Repository: flink Updated Branches: refs/heads/master b5a7b3578 -> 92ad53e6a
[FLINK-4812][metrics] Expose currentLowWatermark for all operators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92ad53e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92ad53e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92ad53e6 Branch: refs/heads/master Commit: 92ad53e6a7e4c9df7055aac89b49ae95d50c5b4c Parents: b5a7b35 Author: zentol <ches...@apache.org> Authored: Tue Dec 5 14:20:29 2017 +0100 Committer: zentol <ches...@apache.org> Committed: Fri Feb 2 16:12:07 2018 +0100 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 40 +++++++-- .../flink/runtime/metrics/MetricNames.java | 5 ++ .../util/InterceptingOperatorMetricGroup.java | 53 +++++++++++ .../selector/CopyingDirectedOutput.java | 2 +- .../api/collector/selector/DirectedOutput.java | 17 +++- .../runtime/io/RecordWriterOutput.java | 13 ++- .../runtime/io/StreamInputProcessor.java | 36 ++------ .../runtime/io/StreamTwoInputProcessor.java | 42 +++------ .../runtime/metrics/MinWatermarkGauge.java | 40 +++++++++ .../runtime/metrics/WatermarkGauge.java | 38 ++++++++ .../runtime/tasks/OneInputStreamTask.java | 12 ++- .../streaming/runtime/tasks/OperatorChain.java | 63 ++++++++++--- .../runtime/tasks/TwoInputStreamTask.java | 22 ++++- .../runtime/metrics/MinWatermarkGaugeTest.java | 46 ++++++++++ .../runtime/metrics/WatermarkGaugeTest.java | 38 ++++++++ .../runtime/tasks/OneInputStreamTaskTest.java | 89 ++++++++++++++++++ .../runtime/tasks/TwoInputStreamTaskTest.java | 94 ++++++++++++++++++++ 17 files changed, 559 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 00c01b7..62adeb1 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1190,12 +1190,7 @@ Thus, in order to infer the metric identifier: </thead> <tbody> <tr> - <th rowspan="7"><strong>Task</strong></th> - <td>currentLowWatermark</td> - <td>The lowest watermark this task has received (in milliseconds).</td> - <td>Gauge</td> - </tr> - <tr> + <th rowspan="6"><strong>Task</strong></th> <td>numBytesInLocal</td> <td>The total number of bytes this task has read from a local source.</td> <td>Counter</td> @@ -1252,7 +1247,38 @@ Thus, in order to infer the metric identifier: <td>Counter</td> </tr> <tr> - <th rowspan="2"><strong>Operator</strong></th> + <th rowspan="6"><strong>Operator</strong></th> + <td>currentInputWatermark</td> + <td> + The last watermark this operator has received (in milliseconds). + <p><strong>Note:</strong> For operators with 2 inputs this is the minimum of the last received watermarks.</p> + </td> + <td>Gauge</td> + </tr> + <tr> + <td>currentInput1Watermark</td> + <td> + The last watermark this operator has received in its first input (in milliseconds). + <p><strong>Note:</strong> Only for operators with 2 inputs.</p> + </td> + <td>Gauge</td> + </tr> + <tr> + <td>currentInput2Watermark</td> + <td> + The last watermark this operator has received in its second input (in milliseconds). + <p><strong>Note:</strong> Only for operators with 2 inputs.</p> + </td> + <td>Gauge</td> + </tr> + <tr> + <td>currentOutputWatermark</td> + <td> + The last watermark this operator has emitted (in milliseconds). + </td> + <td>Gauge</td> + </tr> + <tr> <td>latency</td> <td>The latency distributions from all incoming sources (in milliseconds).</td> <td>Histogram</td> http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 300b4b0..d15a0f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -39,4 +39,9 @@ public class MetricNames { public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE; public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE; public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE; + + public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark"; + public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark"; + public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark"; + public static final String IO_CURRENT_OUTPUT_WATERMARK = "currentOutputWatermark"; } http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java new file mode 100644 index 0000000..f4d0581 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingOperatorMetricGroup.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; + +import java.util.HashMap; +import java.util.Map; + +/** + * An {@link OperatorMetricGroup} that exposes all registered metrics. + */ +public class InterceptingOperatorMetricGroup extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup { + + private Map<String, Metric> intercepted; + + /** + * Returns the registered metric for the given name, or null if it was never registered. + * + * @param name metric name + * @return registered metric for the given name, or null if it was never registered + */ + public Metric get(String name) { + return intercepted.get(name); + } + + @Override + protected void addMetric(String name, Metric metric) { + if (intercepted == null) { + intercepted = new HashMap<>(); + } + intercepted.put(name, metric); + super.addMetric(name, metric); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java index 5f7e278..3533cd1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java @@ -36,7 +36,7 @@ public class CopyingDirectedOutput<OUT> extends DirectedOutput<OUT> { @SuppressWarnings({"unchecked", "rawtypes"}) public CopyingDirectedOutput( List<OutputSelector<OUT>> outputSelectors, - List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) { + List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) { super(outputSelectors, outputs); } http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java index ec4700f..6512174 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java @@ -18,11 +18,14 @@ package org.apache.flink.streaming.api.collector.selector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.util.OutputTag; import org.apache.flink.util.XORShiftRandom; @@ -39,7 +42,7 @@ import java.util.Set; * Wrapping {@link Output} that forwards to other {@link Output Outputs } based on a list of * {@link OutputSelector OutputSelectors}. */ -public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { +public class DirectedOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> { protected final OutputSelector<OUT>[] outputSelectors; @@ -51,10 +54,12 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { private final Random random = new XORShiftRandom(); + protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); + @SuppressWarnings({"unchecked", "rawtypes"}) public DirectedOutput( List<OutputSelector<OUT>> outputSelectors, - List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) { + List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) { this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]); this.allOutputs = new Output[outputs.size()]; @@ -65,7 +70,7 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>(); HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>(); - for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) { + for (Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) { final Output<StreamRecord<OUT>> output = outputPair.f0; final StreamEdge edge = outputPair.f1; @@ -100,6 +105,7 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { @Override public void emitWatermark(Watermark mark) { + watermarkGauge.setCurrentWatermark(mark.getTimestamp()); for (Output<StreamRecord<OUT>> out : allOutputs) { out.emitWatermark(mark); } @@ -149,4 +155,9 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { out.close(); } } + + @Override + public Gauge<Long> getWatermarkGauge() { + return watermarkGauge; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 3b70be7..45bbd66 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -19,17 +19,20 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.util.OutputTag; import java.io.IOException; @@ -40,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Implementation of {@link Output} that sends data using a {@link RecordWriter}. */ @Internal -public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { +public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> { private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter; @@ -50,6 +53,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { private final OutputTag outputTag; + private final WatermarkGauge watermarkGauge = new WatermarkGauge(); + @SuppressWarnings("unchecked") public RecordWriterOutput( StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, @@ -108,6 +113,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { @Override public void emitWatermark(Watermark mark) { + watermarkGauge.setCurrentWatermark(mark.getTimestamp()); serializationDelegate.setInstance(mark); if (streamStatusProvider.getStreamStatus().isActive()) { @@ -158,4 +164,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { public void clearBuffers() { recordWriter.clearBuffers(); } + + @Override + public Gauge<Long> getWatermarkGauge() { + return watermarkGauge; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 9f526a5..dc3dc5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -42,6 +41,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -106,7 +106,7 @@ public class StreamInputProcessor<IN> { // ---------------- Metrics ------------------ - private long lastEmittedWatermark; + private final WatermarkGauge watermarkGauge; private Counter numRecordsIn; private boolean isFinished; @@ -121,7 +121,9 @@ public class StreamInputProcessor<IN> { IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, - OneInputStreamOperator<IN, ?> streamOperator) throws IOException { + OneInputStreamOperator<IN, ?> streamOperator, + TaskIOMetricGroup metrics, + WatermarkGauge watermarkGauge) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -160,14 +162,15 @@ public class StreamInputProcessor<IN> { this.numInputChannels = inputGate.getNumberOfInputChannels(); - this.lastEmittedWatermark = Long.MIN_VALUE; - this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer); this.streamOperator = checkNotNull(streamOperator); this.statusWatermarkValve = new StatusWatermarkValve( numInputChannels, new ForwardingValveOutputHandler(streamOperator, lock)); + + this.watermarkGauge = watermarkGauge; + metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos); } public boolean processInput() throws Exception { @@ -247,27 +250,6 @@ public class StreamInputProcessor<IN> { } } - /** - * Sets the metric group for this StreamInputProcessor. - * - * @param metrics metric group - */ - public void setMetricGroup(TaskIOMetricGroup metrics) { - metrics.gauge("currentLowWatermark", new Gauge<Long>() { - @Override - public Long getValue() { - return lastEmittedWatermark; - } - }); - - metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() { - @Override - public Long getValue() { - return barrierHandler.getAlignmentDurationNanos(); - } - }); - } - public void cleanup() throws IOException { // clear the buffers first. this part should not ever fail for (RecordDeserializer<?> deserializer : recordDeserializers) { @@ -295,7 +277,7 @@ public class StreamInputProcessor<IN> { public void handleWatermark(Watermark watermark) { try { synchronized (lock) { - lastEmittedWatermark = watermark.getTimestamp(); + watermarkGauge.setCurrentWatermark(watermark.getTimestamp()); operator.processWatermark(watermark); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a3d9236..494a82a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -42,6 +41,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -120,8 +120,8 @@ public class StreamTwoInputProcessor<IN1, IN2> { // ---------------- Metrics ------------------ - private long lastEmittedWatermark1; - private long lastEmittedWatermark2; + private final WatermarkGauge input1WatermarkGauge; + private final WatermarkGauge input2WatermarkGauge; private Counter numRecordsIn; @@ -139,7 +139,10 @@ public class StreamTwoInputProcessor<IN1, IN2> { IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, - TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException { + TwoInputStreamOperator<IN1, IN2, ?> streamOperator, + TaskIOMetricGroup metrics, + WatermarkGauge input1WatermarkGauge, + WatermarkGauge input2WatermarkGauge) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -188,9 +191,6 @@ public class StreamTwoInputProcessor<IN1, IN2> { this.numInputChannels1 = numInputChannels1; this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1; - this.lastEmittedWatermark1 = Long.MIN_VALUE; - this.lastEmittedWatermark2 = Long.MIN_VALUE; - this.firstStatus = StreamStatus.ACTIVE; this.secondStatus = StreamStatus.ACTIVE; @@ -200,6 +200,9 @@ public class StreamTwoInputProcessor<IN1, IN2> { this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock)); this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock)); + this.input1WatermarkGauge = input1WatermarkGauge; + this.input2WatermarkGauge = input2WatermarkGauge; + metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos); } public boolean processInput() throws Exception { @@ -312,27 +315,6 @@ public class StreamTwoInputProcessor<IN1, IN2> { } } - /** - * Sets the metric group for this StreamTwoInputProcessor. - * - * @param metrics metric group - */ - public void setMetricGroup(TaskIOMetricGroup metrics) { - metrics.gauge("currentLowWatermark", new Gauge<Long>() { - @Override - public Long getValue() { - return Math.min(lastEmittedWatermark1, lastEmittedWatermark2); - } - }); - - metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() { - @Override - public Long getValue() { - return barrierHandler.getAlignmentDurationNanos(); - } - }); - } - public void cleanup() throws IOException { // clear the buffers first. this part should not ever fail for (RecordDeserializer<?> deserializer : recordDeserializers) { @@ -360,7 +342,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { public void handleWatermark(Watermark watermark) { try { synchronized (lock) { - lastEmittedWatermark1 = watermark.getTimestamp(); + input1WatermarkGauge.setCurrentWatermark(watermark.getTimestamp()); operator.processWatermark1(watermark); } } catch (Exception e) { @@ -404,7 +386,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { public void handleWatermark(Watermark watermark) { try { synchronized (lock) { - lastEmittedWatermark2 = watermark.getTimestamp(); + input2WatermarkGauge.setCurrentWatermark(watermark.getTimestamp()); operator.processWatermark2(watermark); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java new file mode 100644 index 0000000..6736dca --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGauge.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.metrics; + +import org.apache.flink.metrics.Gauge; + +/** + * A {@link Gauge} for exposing the minimum watermark of a {@link WatermarkGauge} pair. + */ +public class MinWatermarkGauge implements Gauge<Long> { + + private WatermarkGauge watermarkGauge1; + private WatermarkGauge watermarkGauge2; + + public MinWatermarkGauge(WatermarkGauge watermarkGauge1, WatermarkGauge watermarkGauge2) { + this.watermarkGauge1 = watermarkGauge1; + this.watermarkGauge2 = watermarkGauge2; + } + + @Override + public Long getValue() { + return Math.min(watermarkGauge1.getValue(), watermarkGauge2.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java new file mode 100644 index 0000000..42c7000 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/metrics/WatermarkGauge.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.metrics; + +import org.apache.flink.metrics.Gauge; + +/** + * A {@link Gauge} for exposing the current input/output watermark. + */ +public class WatermarkGauge implements Gauge<Long> { + + private long currentWatermark = Long.MIN_VALUE; + + public void setCurrentWatermark(long watermark) { + this.currentWatermark = watermark; + } + + @Override + public Long getValue() { + return currentWatermark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index bc8aaae..26088e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -23,9 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import javax.annotation.Nullable; @@ -39,6 +41,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO private volatile boolean running = true; + private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); + /** * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). * @@ -84,11 +88,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), - this.headOperator); - - // make sure that stream tasks report their I/O statistics - inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); + this.headOperator, + getEnvironment().getMetricGroup().getIOMetricGroup(), + inputWatermarkGauge); } + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 141a623..fdeea17 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,12 +21,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -41,6 +43,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; @@ -77,7 +80,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea private final RecordWriterOutput<?>[] streamOutputs; - private final Output<StreamRecord<OUT>> chainEntryPoint; + private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint; private final OP headOperator; @@ -134,8 +137,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea allOps); if (headOperator != null) { - Output output = getChainEntryPoint(); + WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint(); headOperator.setup(containingTask, configuration, output); + + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge()); } // add head operator to end of chain @@ -209,7 +214,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea return allOperators; } - public Output<StreamRecord<OUT>> getChainEntryPoint() { + public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getChainEntryPoint() { return chainEntryPoint; } @@ -259,21 +264,21 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea // initialization utilities // ------------------------------------------------------------------------ - private <T> Output<StreamRecord<T>> createOutputCollector( + private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector( StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) { - List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4); + List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4); // create collectors for the network outputs for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge); - allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output, outputEdge)); + allOutputs.add(new Tuple2<>(output, outputEdge)); } // Create collectors for the chained outputs @@ -281,7 +286,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea int outputId = outputEdge.getTargetId(); StreamConfig chainedOpConfig = chainedConfigs.get(outputId); - Output<StreamRecord<T>> output = createChainedOperator( + WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator( containingTask, chainedOpConfig, chainedConfigs, @@ -336,7 +341,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } } - private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator( + private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator( StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, @@ -345,7 +350,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea List<StreamOperator<?>> allOperators, OutputTag<IN> outputTag) { // create the output that the operator writes to first. this may recursively create more operators - Output<StreamRecord<OUT>> output = createOutputCollector( + WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector( containingTask, operatorConfig, chainedConfigs, @@ -356,17 +361,23 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea // now create the operator and give it the output collector to write its output to OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput); allOperators.add(chainedOperator); + WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput; if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this, outputTag); + currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); + currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } + + chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()); + chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()); + + return currentOperatorOutput; } private <T> RecordWriterOutput<T> createStreamOutput( @@ -414,10 +425,20 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea // Collectors for output chaining // ------------------------------------------------------------------------ - private static class ChainingOutput<T> implements Output<StreamRecord<T>> { + /** + * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}. + * + * @param <T> The type of the elements that can be emitted. + */ + public interface WatermarkGaugeExposingOutput<T> extends Output<T> { + Gauge<Long> getWatermarkGauge(); + } + + private static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { protected final OneInputStreamOperator<T, ?> operator; protected final Counter numRecordsIn; + protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); protected final StreamStatusProvider streamStatusProvider; @@ -487,6 +508,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea @Override public void emitWatermark(Watermark mark) { try { + watermarkGauge.setCurrentWatermark(mark.getTimestamp()); if (streamStatusProvider.getStreamStatus().isActive()) { operator.processWatermark(mark); } @@ -515,6 +537,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea throw new ExceptionInChainedOperatorException(e); } } + + @Override + public Gauge<Long> getWatermarkGauge() { + return watermarkGauge; + } } private static final class CopyingChainingOutput<T> extends ChainingOutput<T> { @@ -582,7 +609,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } } - private static class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> { + private static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { protected final Output<StreamRecord<T>>[] outputs; @@ -590,6 +617,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea private final StreamStatusProvider streamStatusProvider; + private final WatermarkGauge watermarkGauge = new WatermarkGauge(); + public BroadcastingOutputCollector( Output<StreamRecord<T>>[] outputs, StreamStatusProvider streamStatusProvider) { @@ -599,6 +628,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea @Override public void emitWatermark(Watermark mark) { + watermarkGauge.setCurrentWatermark(mark.getTimestamp()); if (streamStatusProvider.getStreamStatus().isActive()) { for (Output<StreamRecord<T>> output : outputs) { output.emitWatermark(mark); @@ -619,6 +649,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } @Override + public Gauge<Long> getWatermarkGauge() { + return watermarkGauge; + } + + @Override public void collect(StreamRecord<T> record) { for (Output<StreamRecord<T>> output : outputs) { output.collect(record); http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index c1ba7b2..bd878f6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -21,10 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; +import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import java.util.ArrayList; import java.util.List; @@ -39,6 +42,10 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS private volatile boolean running = true; + private final WatermarkGauge input1WatermarkGauge; + private final WatermarkGauge input2WatermarkGauge; + private final MinWatermarkGauge minInputWatermarkGauge; + /** * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). * @@ -46,6 +53,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS */ public TwoInputStreamTask(Environment env) { super(env); + input1WatermarkGauge = new WatermarkGauge(); + input2WatermarkGauge = new WatermarkGauge(); + minInputWatermarkGauge = new MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge); } @Override @@ -87,10 +97,14 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), - this.headOperator); - - // make sure that stream tasks report their I/O statistics - inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); + this.headOperator, + getEnvironment().getMetricGroup().getIOMetricGroup(), + input1WatermarkGauge, + input2WatermarkGauge); + + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java new file mode 100644 index 0000000..ae673a8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/MinWatermarkGaugeTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.metrics; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the {@link MinWatermarkGauge}. + */ +public class MinWatermarkGaugeTest { + + @Test + public void testSetCurrentLowWatermark() { + WatermarkGauge metric1 = new WatermarkGauge(); + WatermarkGauge metric2 = new WatermarkGauge(); + MinWatermarkGauge metric = new MinWatermarkGauge(metric1, metric2); + + Assert.assertEquals(Long.MIN_VALUE, metric.getValue().longValue()); + + metric1.setCurrentWatermark(1); + Assert.assertEquals(Long.MIN_VALUE, metric.getValue().longValue()); + + metric2.setCurrentWatermark(2); + Assert.assertEquals(1L, metric.getValue().longValue()); + + metric1.setCurrentWatermark(3); + Assert.assertEquals(2L, metric.getValue().longValue()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java new file mode 100644 index 0000000..c8cec88 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/metrics/WatermarkGaugeTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.metrics; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the {@link WatermarkGauge}. + */ +public class WatermarkGaugeTest { + + @Test + public void testSetCurrentLowWatermark() { + WatermarkGauge metric = new WatermarkGauge(); + + Assert.assertEquals(Long.MIN_VALUE, metric.getValue().longValue()); + + metric.setCurrentWatermark(64); + Assert.assertEquals(64, metric.getValue().longValue()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 9a7b699..3e0459d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ListState; @@ -28,12 +29,19 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -603,6 +611,87 @@ public class OneInputStreamTaskTest extends TestLogger { } } + @Test + public void testWatermarkMetrics() throws Exception { + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator(); + OperatorID headOperatorId = new OperatorID(); + + OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator(); + OperatorID chainedOperatorId = new OperatorID(); + + testHarness.setupOperatorChain(headOperatorId, headOperator) + .chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) + .finish(); + + InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { + @Override + public OperatorMetricGroup addOperator(OperatorID id, String name) { + if (id.equals(headOperatorId)) { + return headOperatorMetricGroup; + } else if (id.equals(chainedOperatorId)) { + return chainedOperatorMetricGroup; + } else { + return super.addOperator(id, name); + } + } + }; + + StreamMockEnvironment env = new StreamMockEnvironment( + testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) { + @Override + public TaskMetricGroup getMetricGroup() { + return taskMetricGroup; + } + }; + + testHarness.invoke(env); + testHarness.waitForTaskRunning(); + + Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + + Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(1L)); + testHarness.waitForInputProcessing(); + Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(2L)); + testHarness.waitForInputProcessing(); + Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + static class WatermarkMetricOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(new Watermark(mark.getTimestamp() * 2)); + } + } + //============================================================================================== // Utility functions and classes //============================================================================================== http://git-wip-us.apache.org/repos/asf/flink/blob/92ad53e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index a267c5a..58e28b3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -18,12 +18,21 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -373,6 +382,91 @@ public class TwoInputStreamTaskTest { testHarness.getOutput()); } + @Test + public void testWatermarkMetrics() throws Exception { + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap()); + final OperatorID headOperatorId = new OperatorID(); + + OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator(); + OperatorID chainedOperatorId = new OperatorID(); + + testHarness.setupOperatorChain(headOperatorId, headOperator) + .chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) + .finish(); + + InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { + @Override + public OperatorMetricGroup addOperator(OperatorID id, String name) { + if (id.equals(headOperatorId)) { + return headOperatorMetricGroup; + } else if (id.equals(chainedOperatorId)) { + return chainedOperatorMetricGroup; + } else { + return super.addOperator(id, name); + } + } + }; + + StreamMockEnvironment env = new StreamMockEnvironment( + testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) { + @Override + public TaskMetricGroup getMetricGroup() { + return taskMetricGroup; + } + }; + + testHarness.invoke(env); + testHarness.waitForTaskRunning(); + + Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK); + Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK); + Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + + Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(1L), 0, 0); + testHarness.waitForInputProcessing(); + Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(2L), 1, 0); + testHarness.waitForInputProcessing(); + Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue()); + Assert.assertEquals(1L, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(3L), 0, 0); + testHarness.waitForInputProcessing(); + Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue()); + Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + // This must only be used in one test, otherwise the static fields will be changed // by several tests concurrently private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {