[FLINK-3949] Collect Metrics in Runtime Operators currentLowWatermark lastCheckpointSize numBytes(In/Out)(Local/Remote) numRecords(In/Out) numSplitsProcessed (Batch only)
This closes #2090 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c78b3c49 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c78b3c49 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c78b3c49 Branch: refs/heads/master Commit: c78b3c49e0e82874cbfa71e88bf28b99ed395610 Parents: 1049585 Author: zentol <ches...@apache.org> Authored: Wed Jun 1 11:24:40 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Jun 16 12:24:57 2016 +0200 ---------------------------------------------------------------------- .../flink/metrics/groups/IOMetricGroup.java | 22 +++------ .../api/reader/AbstractRecordReader.java | 8 --- .../io/network/api/reader/BufferReader.java | 5 -- .../io/network/api/reader/ReaderBase.java | 8 --- .../AdaptiveSpanningRecordDeserializer.java | 20 -------- .../api/serialization/RecordDeserializer.java | 8 --- .../serialization/SpanningRecordSerializer.java | 6 --- ...llingAdaptiveSpanningRecordDeserializer.java | 20 -------- .../partition/consumer/InputChannel.java | 8 ++- .../partition/consumer/LocalInputChannel.java | 12 +++-- .../partition/consumer/RemoteInputChannel.java | 12 +++-- .../partition/consumer/SingleInputGate.java | 20 +++++--- .../partition/consumer/UnknownInputChannel.java | 13 +++-- .../AbstractCachedBuildSideJoinDriver.java | 12 +++-- .../operators/AbstractOuterJoinDriver.java | 14 ++++-- .../operators/AllGroupCombineDriver.java | 10 +++- .../runtime/operators/AllReduceDriver.java | 14 +++++- .../flink/runtime/operators/BatchTask.java | 1 - .../flink/runtime/operators/CoGroupDriver.java | 13 +++-- .../flink/runtime/operators/CrossDriver.java | 46 ++++++++++++------ .../flink/runtime/operators/DataSinkTask.java | 11 +++-- .../flink/runtime/operators/DataSourceTask.java | 12 ++++- .../flink/runtime/operators/FlatMapDriver.java | 8 ++- .../runtime/operators/GroupReduceDriver.java | 10 +++- .../flink/runtime/operators/JoinDriver.java | 12 +++-- .../flink/runtime/operators/MapDriver.java | 8 ++- .../runtime/operators/MapPartitionDriver.java | 9 +++- .../runtime/operators/NoOpChainedDriver.java | 1 + .../flink/runtime/operators/NoOpDriver.java | 8 ++- .../runtime/operators/ReduceCombineDriver.java | 10 +++- .../flink/runtime/operators/ReduceDriver.java | 11 ++++- .../operators/UnionWithTempOperator.java | 8 ++- .../chaining/ChainedAllReduceDriver.java | 1 + .../operators/chaining/ChainedDriver.java | 12 ++++- .../chaining/ChainedFlatMapDriver.java | 1 + .../operators/chaining/ChainedMapDriver.java | 1 + .../ChainedTerminationCriterionDriver.java | 1 + .../chaining/GroupCombineChainedDriver.java | 1 + .../SynchronousChainedCombineDriver.java | 1 + .../util/metrics/CountingCollector.java | 42 ++++++++++++++++ .../util/metrics/CountingIterable.java | 38 +++++++++++++++ .../util/metrics/CountingIterator.java | 48 ++++++++++++++++++ .../metrics/CountingMutableObjectIterator.java | 51 ++++++++++++++++++++ .../apache/flink/runtime/taskmanager/Task.java | 3 +- .../network/api/reader/AbstractReaderTest.java | 5 -- .../partition/consumer/InputChannelTest.java | 3 +- .../consumer/LocalInputChannelTest.java | 10 ++-- .../consumer/RemoteInputChannelTest.java | 10 ++-- .../partition/consumer/SingleInputGateTest.java | 19 +++++--- .../partition/consumer/TestSingleInputGate.java | 3 +- .../partition/consumer/UnionInputGateTest.java | 5 +- .../testutils/UnregisteredTaskMetricsGroup.java | 16 ++++++ .../api/operators/AbstractStreamOperator.java | 30 +++++++++++- .../streaming/api/operators/StreamSource.java | 4 ++ .../runtime/io/StreamInputProcessor.java | 17 +++++-- .../runtime/io/StreamTwoInputProcessor.java | 10 ++-- .../streaming/runtime/tasks/OperatorChain.java | 5 ++ .../runtime/tasks/SourceStreamTask.java | 2 +- .../runtime/tasks/StreamIterationTail.java | 42 ++++++++++++---- .../streaming/runtime/tasks/StreamTask.java | 12 +++++ 60 files changed, 573 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java index b34c844..46bf2af 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -26,33 +26,27 @@ import org.apache.flink.metrics.MetricRegistry; */ public class IOMetricGroup extends AbstractMetricGroup { - private final Counter numBytesIn; private final Counter numBytesOut; - private final Counter numRecordsIn; - private final Counter numRecordsOut; + private final Counter numBytesInLocal; + private final Counter numBytesInRemote; public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) { super(registry, parent.getScopeComponents()); - - this.numBytesIn = parent.counter("numBytesIn"); this.numBytesOut = parent.counter("numBytesOut"); - this.numRecordsIn = parent.counter("numRecordsIn"); - this.numRecordsOut = parent.counter("numRecordsOut"); - } - public Counter getBytesInCounter() { - return numBytesIn; + this.numBytesInLocal = parent.counter("numBytesInLocal"); + this.numBytesInRemote = parent.counter("numBytesInRemote"); } public Counter getBytesOutCounter() { return numBytesOut; } - public Counter getRecordsInCounter() { - return numRecordsIn; + public Counter getNumBytesInLocalCounter() { + return numBytesInLocal; } - public Counter getRecordsOutCounter() { - return numRecordsOut; + public Counter getNumBytesInRemoteCounter() { + return numBytesInRemote; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 48ac558..e0fe355 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; @@ -131,11 +130,4 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra deserializer.setReporter(reporter); } } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index e5f5930..debb352 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.api.reader; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -54,8 +53,4 @@ public final class BufferReader extends AbstractReader { public void setReporter(AccumulatorRegistry.Reporter reporter) { } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java index 192a9ab..a1d705f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader; import java.io.IOException; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; @@ -58,11 +57,4 @@ public interface ReaderBase { */ void setReporter(AccumulatorRegistry.Reporter reporter); - /** - * Setter for the metric group. - * - * @param metrics metric group to set - */ - void setMetricGroup(IOMetricGroup metrics); - } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 1c17476..cdd8731 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; @@ -49,9 +47,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im private Buffer currentBuffer; private AccumulatorRegistry.Reporter reporter; - - private transient Counter numRecordsIn; - private transient Counter numBytesIn; public AdaptiveSpanningRecordDeserializer() { this.nonSpanningWrapper = new NonSpanningWrapper(); @@ -101,9 +96,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im if (reporter != null) { reporter.reportNumBytesIn(len); } - if (numBytesIn != null) { - numBytesIn.inc(len); - } if (len <= nonSpanningRemaining - 4) { // we can get a full record from here @@ -112,9 +104,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } return (this.nonSpanningWrapper.remaining() == 0) ? DeserializationResult.LAST_RECORD_FROM_BUFFER : @@ -142,9 +131,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment @@ -179,12 +165,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im this.spanningWrapper.setReporter(reporter); } - @Override - public void instantiateMetrics(IOMetricGroup metrics) { - numBytesIn = metrics.getBytesInCounter(); - numRecordsIn = metrics.getRecordsInCounter(); - } - // ----------------------------------------------------------------------------------------------------------------- private static final class NonSpanningWrapper implements DataInputView { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java index 2f0c1ac..e4c7890 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -71,11 +70,4 @@ public interface RecordDeserializer<T extends IOReadableWritable> { * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. */ void setReporter(AccumulatorRegistry.Reporter reporter); - - /** - * Instantiates all metrics. - * - * @param metrics metric group - */ - void instantiateMetrics(IOMetricGroup metrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 6495650..b218de8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -55,7 +55,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R private AccumulatorRegistry.Reporter reporter; - private transient Counter numRecordsOut; private transient Counter numBytesOut; public SpanningRecordSerializer() { @@ -94,10 +93,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R if (numBytesOut != null) { numBytesOut.inc(len); } - - if (numRecordsOut != null) { - numRecordsOut.inc(); - } this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); @@ -204,6 +199,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public void instantiateMetrics(IOMetricGroup metrics) { numBytesOut = metrics.getBytesOutCounter(); - numRecordsOut = metrics.getRecordsOutCounter(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 7e96390..eab8e7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -22,8 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; @@ -63,9 +61,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit private AccumulatorRegistry.Reporter reporter; - private Counter numRecordsIn; - private Counter numBytesIn; - public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) { this.nonSpanningWrapper = new NonSpanningWrapper(); this.spanningWrapper = new SpanningWrapper(tmpDirectories); @@ -114,9 +109,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (reporter != null) { reporter.reportNumBytesIn(len); } - if (numBytesIn != null) { - numBytesIn.inc(len); - } if (len <= nonSpanningRemaining - 4) { // we can get a full record from here @@ -126,9 +118,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } int remaining = this.nonSpanningWrapper.remaining(); if (remaining > 0) { @@ -168,9 +157,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (reporter != null) { reporter.reportNumRecordsIn(1); } - if (numRecordsIn != null) { - numRecordsIn.inc(); - } // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment @@ -202,12 +188,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit this.spanningWrapper.setReporter(reporter); } - @Override - public void instantiateMetrics(IOMetricGroup metrics) { - numBytesIn = metrics.getBytesInCounter(); - numRecordsIn = metrics.getRecordsInCounter(); - } - // ----------------------------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index e6e078d..5d82903 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -61,6 +62,8 @@ public abstract class InputChannel { /** The maximum backoff (in ms). */ private final int maxBackoff; + protected final Counter numBytesIn; + /** The current backoff (in ms) */ private int currentBackoff; @@ -68,7 +71,8 @@ public abstract class InputChannel { SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, - Tuple2<Integer, Integer> initialAndMaxBackoff) { + Tuple2<Integer, Integer> initialAndMaxBackoff, + Counter numBytesIn) { checkArgument(channelIndex >= 0); @@ -84,6 +88,8 @@ public abstract class InputChannel { this.initialBackoff = initial; this.maxBackoff = max; this.currentBackoff = initial == 0 ? -1 : 0; + + this.numBytesIn = numBytesIn; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 92b6d1f..fc35bef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -67,10 +68,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe int channelIndex, ResultPartitionID partitionId, ResultPartitionManager partitionManager, - TaskEventDispatcher taskEventDispatcher) { + TaskEventDispatcher taskEventDispatcher, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, - new Tuple2<Integer, Integer>(0, 0)); + new Tuple2<Integer, Integer>(0, 0), metrics); } LocalInputChannel( @@ -79,9 +81,10 @@ public class LocalInputChannel extends InputChannel implements NotificationListe ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, - Tuple2<Integer, Integer> initialAndMaxBackoff) { + Tuple2<Integer, Integer> initialAndMaxBackoff, + IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter()); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); @@ -165,6 +168,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe getNextLookAhead(); + numBytesIn.inc(next.getSize()); return next; } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 814e961..b316fd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -80,10 +81,11 @@ public class RemoteInputChannel extends InputChannel { int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, - ConnectionManager connectionManager) { + ConnectionManager connectionManager, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, connectionId, connectionManager, - new Tuple2<Integer, Integer>(0, 0)); + new Tuple2<Integer, Integer>(0, 0), metrics); } public RemoteInputChannel( @@ -92,9 +94,10 @@ public class RemoteInputChannel extends InputChannel { ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, - Tuple2<Integer, Integer> initialAndMaxBackoff) { + Tuple2<Integer, Integer> initialAndMaxBackoff, + IOMetricGroup metrics) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter()); this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); @@ -148,6 +151,7 @@ public class RemoteInputChannel extends InputChannel { throw new IOException("Queried input channel for data although non is available."); } + numBytesIn.inc(buffer.getSize()); return buffer; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 81d202a..90e395c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -173,7 +174,8 @@ public class SingleInputGate implements InputGate { IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, - PartitionStateChecker partitionStateChecker) { + PartitionStateChecker partitionStateChecker, + IOMetricGroup metrics) { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); @@ -502,7 +504,8 @@ public class SingleInputGate implements InputGate { JobID jobId, ExecutionAttemptID executionId, InputGateDeploymentDescriptor igdd, - NetworkEnvironment networkEnvironment) { + NetworkEnvironment networkEnvironment, + IOMetricGroup metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); @@ -512,7 +515,8 @@ public class SingleInputGate implements InputGate { final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); final SingleInputGate inputGate = new SingleInputGate( - owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker()); + owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, + icdd.length, networkEnvironment.getPartitionStateChecker(), metrics); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; @@ -526,13 +530,16 @@ public class SingleInputGate implements InputGate { inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff()); + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics + ); } else if (partitionLocation.isRemote()) { inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff() + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics ); } else if (partitionLocation.isUnknown()) { @@ -540,7 +547,8 @@ public class SingleInputGate implements InputGate { networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialAndMaxBackoff() + networkEnvironment.getPartitionRequestInitialAndMaxBackoff(), + metrics ); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 015f3fa..840c805 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -47,6 +48,8 @@ public class UnknownInputChannel extends InputChannel { /** Initial and maximum backoff (in ms) after failed partition requests. */ private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff; + private final IOMetricGroup metrics; + public UnknownInputChannel( SingleInputGate gate, int channelIndex, @@ -54,14 +57,16 @@ public class UnknownInputChannel extends InputChannel { ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, ConnectionManager connectionManager, - Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) { + Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff, + IOMetricGroup metrics) { - super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff); + super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff, null); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); this.connectionManager = checkNotNull(connectionManager); this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff); + this.metrics = checkNotNull(metrics); } @Override @@ -112,10 +117,10 @@ public class UnknownInputChannel extends InputChannel { // ------------------------------------------------------------------------ public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) { - return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff); + return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff, metrics); } public LocalInputChannel toLocalInputChannel() { - return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff); + return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff, metrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index e034dd6..406d430 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -63,13 +66,15 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo @Override public void initialize() throws Exception { TaskConfig config = this.taskContext.getTaskConfig(); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer(); TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer(); TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0); TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1); - MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0); - MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1); + MutableObjectIterator<IT1> input1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn); + MutableObjectIterator<IT2> input2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn); TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader()); @@ -164,8 +169,9 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo @Override public void run() throws Exception { + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector)); } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java index 2589ca5..a28e27e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java @@ -23,10 +23,13 @@ import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -84,9 +87,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl final double driverMemFraction = config.getRelativeMemoryDriver(); final DriverStrategy ls = config.getDriverStrategy(); - - final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn); + final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn); // get serializers and comparators final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer(); @@ -147,8 +151,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final JoinTaskIterator<IT1, IT2, OT> outerJoinIterator = this.outerJoinIterator; while (this.running && outerJoinIterator.callWithNextKey(joinStub, collector)) ; http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index 0c8dc34..f0673c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.util.Collector; @@ -91,12 +94,15 @@ public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFuncti LOG.debug("AllGroupCombine starting."); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0); TypeSerializer<IN> serializer = serializerFactory.getSerializer(); - final MutableObjectIterator<IN> in = this.taskContext.getInput(0); + final MutableObjectIterator<IN> in = new CountingMutableObjectIterator<>(this.taskContext.<IN>getInput(0), numRecordsIn); final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub(); - final Collector<OUT> output = this.taskContext.getOutputCollector(); + final Collector<OUT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingMutableToRegularIteratorWrapper<IN> inIter = new ReusingMutableToRegularIteratorWrapper<IN>(in, serializer); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index e8545e7..13d7222 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -104,14 +107,19 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code.")); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final ReduceFunction<T> stub = this.taskContext.getStub(); final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; + final Collector<T> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); T val1; if ((val1 = input.next()) == null) { return; } + numRecordsIn.inc(); if (objectReuseEnabled) { // We only need two objects. The first reference stores results and is @@ -121,6 +129,7 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { T value = val1; while (running && (val2 = input.next(val2)) != null) { + numRecordsIn.inc(); value = stub.reduce(value, val2); // we must never read into the object returned @@ -132,14 +141,15 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { } } - this.taskContext.getOutputCollector().collect(value); + collector.collect(value); } else { T val2; while (running && (val2 = input.next()) != null) { + numRecordsIn.inc(); val1 = stub.reduce(val1, val2); } - this.taskContext.getOutputCollector().collect(val1); + collector.collect(val1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index f38b988..36965ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -676,7 +676,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme } inputReaders[i].setReporter(reporter); - inputReaders[i].setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); currentReaderOffset += groupSize; } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java index 665ab0e..43a913d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java @@ -20,7 +20,10 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -93,9 +96,11 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1, if (config.getDriverStrategy() != DriverStrategy.CO_GROUP) { throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name()); } + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); - final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1); + final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn); + final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn); // get the key positions and types final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer(); @@ -144,8 +149,10 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1, @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator; while (this.running && coGroupIterator.next()) { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index c9d84b1..3e1d01f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CrossFunction; @@ -194,9 +197,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " + "First input is outer (blocking) side, second input is inner (spilling) side.")); } - - final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn); + final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn); final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer(); final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer(); @@ -213,7 +219,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { @@ -259,9 +265,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " + "First input is inner (spilling) side, second input is outer (blocking) side.")); } - - final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn); + final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn); final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer(); final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer(); @@ -277,7 +286,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> this.blockIter = blockVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); @@ -322,9 +331,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " + "First input is outer side, second input is inner (spilling) side.")); } - - final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn); + final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn); final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer(); final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer(); @@ -335,7 +347,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> this.spillIter = spillVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); @@ -372,8 +384,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " + "First input is inner (spilling) side, second input is outer side.")); } - final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1); + + final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut"); + + final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn); + final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn); final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer(); final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer(); @@ -384,7 +400,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> this.spillIter = spillVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final T1 val1Reuse = serializer1.createInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 380edd4..b73c85a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.io.RichOutputFormat; @@ -27,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -104,8 +106,11 @@ public class DataSinkTask<IT> extends AbstractInvokable { // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data sink operator")); + RuntimeContext ctx = createRuntimeContext(); + final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn"); + if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ - ((RichOutputFormat) this.format).setRuntimeContext(createRuntimeContext()); + ((RichOutputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Sink detected. Initializing runtime context.")); } @@ -174,6 +179,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { // work! while (!this.taskCanceled && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); format.writeRecord(record); } } else { @@ -181,6 +187,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { // work! while (!this.taskCanceled && ((record = input.next()) != null)) { + numRecordsIn.inc(); format.writeRecord(record); } } @@ -349,8 +356,6 @@ public class DataSinkTask<IT> extends AbstractInvokable { inputReader.setReporter(reporter); - inputReader.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); - this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader()); @SuppressWarnings({ "rawtypes" }) final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 819b84f..c57f133 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -35,6 +37,7 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,8 +100,12 @@ public class DataSourceTask<OT> extends AbstractInvokable { // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data source operator")); + RuntimeContext ctx = createRuntimeContext(); + Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); + Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut"); + if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { - ((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext()); + ((RichInputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); ((RichInputFormat) this.format).openInputFormat(); LOG.debug(getLogString("Rich Source detected. Opening the InputFormat.")); @@ -135,7 +142,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { LOG.debug(getLogString("Starting to read input from split " + split.toString())); try { - final Collector<OT> output = this.output; + final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut); if (objectReuseEnabled) { OT reuse = serializer.createInstance(); @@ -165,6 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { // close. We close here such that a regular close throwing an exception marks a task as failed. format.close(); } + splitCounter.inc(); } // end for all input splits // close the collector. if it is a chaining task collector, it will close its chained tasks http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java index c29923b..5b4a6ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -83,22 +85,26 @@ public class FlatMapDriver<IT, OT> implements Driver<FlatMapFunction<IT, OT>, OT @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final MutableObjectIterator<IT> input = this.taskContext.getInput(0); final FlatMapFunction<IT, OT> function = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); + final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); function.flatMap(record, output); } } else { IT record; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); function.flatMap(record, output); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index d6825ac..ccd88ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -19,6 +19,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -89,9 +92,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) { throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name()); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer(); this.comparator = this.taskContext.getDriverComparator(0); - this.input = this.taskContext.getInput(0); + this.input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn); ExecutionConfig executionConfig = taskContext.getExecutionConfig(); this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); @@ -106,10 +111,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, if (LOG.isDebugEnabled()) { LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code.")); } + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); + final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingKeyGroupedIterator<IT> iter = new ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index f7ad8d1..efb59a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; @@ -34,6 +35,8 @@ import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -84,6 +87,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT @Override public void prepare() throws Exception{ final TaskConfig config = this.taskContext.getTaskConfig(); + + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); // obtain task manager's memory manager and I/O manager final MemoryManager memoryManager = this.taskContext.getMemoryManager(); @@ -96,8 +101,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT // test minimum memory requirements final DriverStrategy ls = config.getDriverStrategy(); - final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0); - final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1); + final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn); + final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn); // get the key positions and types final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer(); @@ -209,8 +214,9 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT @Override public void run() throws Exception { + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub(); - final Collector<OT> collector = this.taskContext.getOutputCollector(); + final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator; while (this.running && joinIterator.callWithNextKey(joinStub, collector)); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java index eefe8e4..65f9061 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -78,15 +80,18 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> { @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack final MutableObjectIterator<IT> input = this.taskContext.getInput(0); final MapFunction<IT, OT> function = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); + final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); output.collect(function.map(record)); } } @@ -94,6 +99,7 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> { IT record = null; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); output.collect(function.map(record)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java index 8f245f0..3496e14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; +import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.util.Collector; @@ -83,10 +86,12 @@ public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<I @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // cache references on the stack - final MutableObjectIterator<IT> input = this.taskContext.getInput(0); + final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn); final MapPartitionFunction<IT, OT> function = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); + final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { final ReusingMutableToRegularIteratorWrapper<IT> inIter = new ReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java index 9b08fad..802227a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java @@ -61,6 +61,7 @@ public class NoOpChainedDriver<IT> extends ChainedDriver<IT, IT> { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.outputCollector.collect(record); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index 073a837..dd64b76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -75,18 +77,22 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> { @Override public void run() throws Exception { // cache references on the stack + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); final MutableObjectIterator<T> input = this.taskContext.getInput(0); - final Collector<T> output = this.taskContext.getOutputCollector(); + final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { T record = this.taskContext.<T>getInputSerializer(0).getSerializer().createInstance(); while (this.running && ((record = input.next(record)) != null)) { + numRecordsIn.inc(); output.collect(record); } } else { T record; while (this.running && ((record = input.next()) != null)) { + numRecordsIn.inc(); output.collect(record); } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index 1ceeaf0..e1ce39e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -104,13 +106,15 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { if (this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_PARTIAL_REDUCE) { throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner."); } + + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); // instantiate the serializer / comparator final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0); this.comparator = this.taskContext.getDriverComparator(0); this.serializer = serializerFactory.getSerializer(); this.reducer = this.taskContext.getStub(); - this.output = this.taskContext.getOutputCollector(); + this.output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( @@ -140,6 +144,8 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { LOG.debug("Combiner starting."); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final MutableObjectIterator<T> in = this.taskContext.getInput(0); final TypeSerializer<T> serializer = this.serializer; @@ -147,6 +153,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { T value = serializer.createInstance(); while (running && (value = in.next(value)) != null) { + numRecordsIn.inc(); // try writing to the sorter first if (this.sorter.write(value)) { @@ -166,6 +173,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { else { T value; while (running && (value = in.next()) != null) { + numRecordsIn.inc(); // try writing to the sorter first if (this.sorter.write(value)) { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index 3b7af6e..eb4f2f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -106,6 +108,9 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code.")); } + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); + // cache references on the stack final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; @@ -113,7 +118,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { final ReduceFunction<T> function = this.taskContext.getStub(); - final Collector<T> output = this.taskContext.getOutputCollector(); + final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { // We only need two objects. The first reference stores results and is @@ -128,10 +133,12 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { // iterate over key groups while (this.running && value != null) { + numRecordsIn.inc(); comparator.setReference(value); // iterate within a key group while ((reuse2 = input.next(reuse2)) != null) { + numRecordsIn.inc(); if (comparator.equalToReference(reuse2)) { // same group, reduce value = function.reduce(value, reuse2); @@ -163,11 +170,13 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { // iterate over key groups while (this.running && value != null) { + numRecordsIn.inc(); comparator.setReference(value); T res = value; // iterate within a key group while ((value = input.next()) != null) { + numRecordsIn.inc(); if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java index 4791761..3d52925 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -58,18 +60,22 @@ public class UnionWithTempOperator<T> implements Driver<Function, T> { @Override public void run() throws Exception { + final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn"); + final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut"); - final Collector<T> output = this.taskContext.getOutputCollector(); + final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance(); T record; final MutableObjectIterator<T> input = this.taskContext.getInput(STREAMED_INPUT); while (this.running && ((record = input.next(reuse)) != null)) { + numRecordsIn.inc(); output.collect(record); } final MutableObjectIterator<T> cache = this.taskContext.getInput(CACHED_INPUT); while (this.running && ((record = cache.next(reuse)) != null)) { + numRecordsIn.inc(); output.collect(record); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java index 46ee41b..1e3482f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java @@ -86,6 +86,7 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> { // -------------------------------------------------------------------------------------------- @Override public void collect(IT record) { + numRecordsIn.inc(); try { if (base == null) { base = objectReuseEnabled ? record : serializer.copy(record); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 407716f..2560135 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -22,12 +22,14 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.operators.util.metrics.CountingCollector; import org.apache.flink.util.Collector; import java.util.Map; @@ -53,6 +55,10 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { protected boolean objectReuseEnabled = false; protected MetricGroup metrics; + + protected Counter numRecordsIn; + + protected Counter numRecordsOut; public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector, @@ -61,9 +67,11 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { { this.config = config; this.taskName = taskName; - this.outputCollector = outputCollector; this.userCodeClassLoader = userCodeClassLoader; this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName); + this.numRecordsIn = this.metrics.counter("numRecordsIn"); + this.numRecordsOut = this.metrics.counter("numRecordsOut"); + this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut); Environment env = parent.getEnvironment(); @@ -103,7 +111,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { @SuppressWarnings("unchecked") public void setOutputCollector(Collector<?> outputCollector) { - this.outputCollector = (Collector<OT>) outputCollector; + this.outputCollector = new CountingCollector<>((Collector<OT>) outputCollector, numRecordsOut); } public Collector<OT> getOutputCollector() {