[FLINK-3949] Collect Metrics in Runtime Operators

currentLowWatermark
lastCheckpointSize
numBytes(In/Out)(Local/Remote)
numRecords(In/Out)
numSplitsProcessed (Batch only)

This closes #2090


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c78b3c49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c78b3c49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c78b3c49

Branch: refs/heads/master
Commit: c78b3c49e0e82874cbfa71e88bf28b99ed395610
Parents: 1049585
Author: zentol <ches...@apache.org>
Authored: Wed Jun 1 11:24:40 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Jun 16 12:24:57 2016 +0200

----------------------------------------------------------------------
 .../flink/metrics/groups/IOMetricGroup.java     | 22 +++------
 .../api/reader/AbstractRecordReader.java        |  8 ---
 .../io/network/api/reader/BufferReader.java     |  5 --
 .../io/network/api/reader/ReaderBase.java       |  8 ---
 .../AdaptiveSpanningRecordDeserializer.java     | 20 --------
 .../api/serialization/RecordDeserializer.java   |  8 ---
 .../serialization/SpanningRecordSerializer.java |  6 ---
 ...llingAdaptiveSpanningRecordDeserializer.java | 20 --------
 .../partition/consumer/InputChannel.java        |  8 ++-
 .../partition/consumer/LocalInputChannel.java   | 12 +++--
 .../partition/consumer/RemoteInputChannel.java  | 12 +++--
 .../partition/consumer/SingleInputGate.java     | 20 +++++---
 .../partition/consumer/UnknownInputChannel.java | 13 +++--
 .../AbstractCachedBuildSideJoinDriver.java      | 12 +++--
 .../operators/AbstractOuterJoinDriver.java      | 14 ++++--
 .../operators/AllGroupCombineDriver.java        | 10 +++-
 .../runtime/operators/AllReduceDriver.java      | 14 +++++-
 .../flink/runtime/operators/BatchTask.java      |  1 -
 .../flink/runtime/operators/CoGroupDriver.java  | 13 +++--
 .../flink/runtime/operators/CrossDriver.java    | 46 ++++++++++++------
 .../flink/runtime/operators/DataSinkTask.java   | 11 +++--
 .../flink/runtime/operators/DataSourceTask.java | 12 ++++-
 .../flink/runtime/operators/FlatMapDriver.java  |  8 ++-
 .../runtime/operators/GroupReduceDriver.java    | 10 +++-
 .../flink/runtime/operators/JoinDriver.java     | 12 +++--
 .../flink/runtime/operators/MapDriver.java      |  8 ++-
 .../runtime/operators/MapPartitionDriver.java   |  9 +++-
 .../runtime/operators/NoOpChainedDriver.java    |  1 +
 .../flink/runtime/operators/NoOpDriver.java     |  8 ++-
 .../runtime/operators/ReduceCombineDriver.java  | 10 +++-
 .../flink/runtime/operators/ReduceDriver.java   | 11 ++++-
 .../operators/UnionWithTempOperator.java        |  8 ++-
 .../chaining/ChainedAllReduceDriver.java        |  1 +
 .../operators/chaining/ChainedDriver.java       | 12 ++++-
 .../chaining/ChainedFlatMapDriver.java          |  1 +
 .../operators/chaining/ChainedMapDriver.java    |  1 +
 .../ChainedTerminationCriterionDriver.java      |  1 +
 .../chaining/GroupCombineChainedDriver.java     |  1 +
 .../SynchronousChainedCombineDriver.java        |  1 +
 .../util/metrics/CountingCollector.java         | 42 ++++++++++++++++
 .../util/metrics/CountingIterable.java          | 38 +++++++++++++++
 .../util/metrics/CountingIterator.java          | 48 ++++++++++++++++++
 .../metrics/CountingMutableObjectIterator.java  | 51 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 .../network/api/reader/AbstractReaderTest.java  |  5 --
 .../partition/consumer/InputChannelTest.java    |  3 +-
 .../consumer/LocalInputChannelTest.java         | 10 ++--
 .../consumer/RemoteInputChannelTest.java        | 10 ++--
 .../partition/consumer/SingleInputGateTest.java | 19 +++++---
 .../partition/consumer/TestSingleInputGate.java |  3 +-
 .../partition/consumer/UnionInputGateTest.java  |  5 +-
 .../testutils/UnregisteredTaskMetricsGroup.java | 16 ++++++
 .../api/operators/AbstractStreamOperator.java   | 30 +++++++++++-
 .../streaming/api/operators/StreamSource.java   |  4 ++
 .../runtime/io/StreamInputProcessor.java        | 17 +++++--
 .../runtime/io/StreamTwoInputProcessor.java     | 10 ++--
 .../streaming/runtime/tasks/OperatorChain.java  |  5 ++
 .../runtime/tasks/SourceStreamTask.java         |  2 +-
 .../runtime/tasks/StreamIterationTail.java      | 42 ++++++++++++----
 .../streaming/runtime/tasks/StreamTask.java     | 12 +++++
 60 files changed, 573 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
index b34c844..46bf2af 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
@@ -26,33 +26,27 @@ import org.apache.flink.metrics.MetricRegistry;
  */
 public class IOMetricGroup extends AbstractMetricGroup {
 
-       private final Counter numBytesIn;
        private final Counter numBytesOut;
-       private final Counter numRecordsIn;
-       private final Counter numRecordsOut;
+       private final Counter numBytesInLocal;
+       private final Counter numBytesInRemote;
 
        public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
                super(registry, parent.getScopeComponents());
-
-               this.numBytesIn = parent.counter("numBytesIn");
                this.numBytesOut = parent.counter("numBytesOut");
-               this.numRecordsIn = parent.counter("numRecordsIn");
-               this.numRecordsOut = parent.counter("numRecordsOut");
-       }
 
-       public Counter getBytesInCounter() {
-               return numBytesIn;
+               this.numBytesInLocal = parent.counter("numBytesInLocal");
+               this.numBytesInRemote = parent.counter("numBytesInRemote");
        }
 
        public Counter getBytesOutCounter() {
                return numBytesOut;
        }
 
-       public Counter getRecordsInCounter() {
-               return numRecordsIn;
+       public Counter getNumBytesInLocalCounter() {
+               return numBytesInLocal;
        }
 
-       public Counter getRecordsOutCounter() {
-               return numRecordsOut;
+       public Counter getNumBytesInRemoteCounter() {
+               return numBytesInRemote;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 48ac558..e0fe355 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
@@ -131,11 +130,4 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                        deserializer.setReporter(reporter);
                }
        }
-       
-       @Override
-       public void setMetricGroup(IOMetricGroup metrics) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.instantiateMetrics(metrics);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index e5f5930..debb352 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -54,8 +53,4 @@ public final class BufferReader extends AbstractReader {
        public void setReporter(AccumulatorRegistry.Reporter reporter) {
 
        }
-
-       @Override
-       public void setMetricGroup(IOMetricGroup metrics) {
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 192a9ab..a1d705f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -58,11 +57,4 @@ public interface ReaderBase {
         */
        void setReporter(AccumulatorRegistry.Reporter reporter);
 
-       /**
-        * Setter for the metric group.
-        *
-        * @param metrics metric group to set
-        */
-       void setMetricGroup(IOMetricGroup metrics);
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 1c17476..cdd8731 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
@@ -49,9 +47,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
        private Buffer currentBuffer;
 
        private AccumulatorRegistry.Reporter reporter;
-       
-       private transient Counter numRecordsIn;
-       private transient Counter numBytesIn;
 
        public AdaptiveSpanningRecordDeserializer() {
                this.nonSpanningWrapper = new NonSpanningWrapper();
@@ -101,9 +96,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                        if (reporter != null) {
                                reporter.reportNumBytesIn(len);
                        }
-                       if (numBytesIn != null) {
-                               numBytesIn.inc(len);
-                       }
 
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
@@ -112,9 +104,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                                if (reporter != null) {
                                        reporter.reportNumRecordsIn(1);
                                }
-                               if (numRecordsIn != null) {
-                                       numRecordsIn.inc();
-                               }
 
                                return (this.nonSpanningWrapper.remaining() == 
0) ?
                                                
DeserializationResult.LAST_RECORD_FROM_BUFFER :
@@ -142,9 +131,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                        if (reporter != null) {
                                reporter.reportNumRecordsIn(1);
                        }
-                       if (numRecordsIn != null) {
-                               numRecordsIn.inc();
-                       }
 
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
@@ -179,12 +165,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                this.spanningWrapper.setReporter(reporter);
        }
 
-       @Override
-       public void instantiateMetrics(IOMetricGroup metrics) {
-               numBytesIn = metrics.getBytesInCounter();
-               numRecordsIn = metrics.getRecordsInCounter();
-       }
-
        // 
-----------------------------------------------------------------------------------------------------------------
 
        private static final class NonSpanningWrapper implements DataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index 2f0c1ac..e4c7890 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
@@ -71,11 +70,4 @@ public interface RecordDeserializer<T extends 
IOReadableWritable> {
         * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
         */
        void setReporter(AccumulatorRegistry.Reporter reporter);
-
-       /**
-        * Instantiates all metrics.
-        *
-        * @param metrics metric group
-        */
-       void instantiateMetrics(IOMetricGroup metrics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 6495650..b218de8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -55,7 +55,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
 
        private AccumulatorRegistry.Reporter reporter;
 
-       private transient Counter numRecordsOut;
        private transient Counter numBytesOut;
 
        public SpanningRecordSerializer() {
@@ -94,10 +93,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                if (numBytesOut != null) {
                        numBytesOut.inc(len);
                }
-               
-               if (numRecordsOut != null) {
-                       numRecordsOut.inc();
-               }
 
                this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -204,6 +199,5 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        @Override
        public void instantiateMetrics(IOMetricGroup metrics) {
                numBytesOut = metrics.getBytesOutCounter();
-               numRecordsOut = metrics.getRecordsOutCounter();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 7e96390..eab8e7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -22,8 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
@@ -63,9 +61,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        private AccumulatorRegistry.Reporter reporter;
 
-       private Counter numRecordsIn;
-       private Counter numBytesIn;
-
        public SpillingAdaptiveSpanningRecordDeserializer(String[] 
tmpDirectories) {
                this.nonSpanningWrapper = new NonSpanningWrapper();
                this.spanningWrapper = new SpanningWrapper(tmpDirectories);
@@ -114,9 +109,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        if (reporter != null) {
                                reporter.reportNumBytesIn(len);
                        }
-                       if (numBytesIn != null) {
-                               numBytesIn.inc(len);
-                       }
 
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
@@ -126,9 +118,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                        if (reporter != null) {
                                                reporter.reportNumRecordsIn(1);
                                        }
-                                       if (numRecordsIn != null) {
-                                               numRecordsIn.inc();
-                                       }
 
                                        int remaining = 
this.nonSpanningWrapper.remaining();
                                        if (remaining > 0) {
@@ -168,9 +157,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        if (reporter != null) {
                                reporter.reportNumRecordsIn(1);
                        }
-                       if (numRecordsIn != null) {
-                               numRecordsIn.inc();
-                       }
                        
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
@@ -202,12 +188,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                this.spanningWrapper.setReporter(reporter);
        }
 
-       @Override
-       public void instantiateMetrics(IOMetricGroup metrics) {
-               numBytesIn = metrics.getBytesInCounter();
-               numRecordsIn = metrics.getRecordsInCounter();
-       }
-
 
        // 
-----------------------------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index e6e078d..5d82903 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -61,6 +62,8 @@ public abstract class InputChannel {
        /** The maximum backoff (in ms). */
        private final int maxBackoff;
 
+       protected final Counter numBytesIn;
+
        /** The current backoff (in ms) */
        private int currentBackoff;
 
@@ -68,7 +71,8 @@ public abstract class InputChannel {
                        SingleInputGate inputGate,
                        int channelIndex,
                        ResultPartitionID partitionId,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff) {
+                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       Counter numBytesIn) {
 
                checkArgument(channelIndex >= 0);
 
@@ -84,6 +88,8 @@ public abstract class InputChannel {
                this.initialBackoff = initial;
                this.maxBackoff = max;
                this.currentBackoff = initial == 0 ? -1 : 0;
+
+               this.numBytesIn = numBytesIn;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 92b6d1f..fc35bef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -67,10 +68,11 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
                        int channelIndex,
                        ResultPartitionID partitionId,
                        ResultPartitionManager partitionManager,
-                       TaskEventDispatcher taskEventDispatcher) {
+                       TaskEventDispatcher taskEventDispatcher,
+                       IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, partitionManager, 
taskEventDispatcher,
-                               new Tuple2<Integer, Integer>(0, 0));
+                               new Tuple2<Integer, Integer>(0, 0), metrics);
        }
 
        LocalInputChannel(
@@ -79,9 +81,10 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
                        ResultPartitionID partitionId,
                        ResultPartitionManager partitionManager,
                        TaskEventDispatcher taskEventDispatcher,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff) {
+                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       IOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff);
+               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -165,6 +168,7 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
 
                getNextLookAhead();
 
+               numBytesIn.inc(next.getSize());
                return next;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 814e961..b316fd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -80,10 +81,11 @@ public class RemoteInputChannel extends InputChannel {
                        int channelIndex,
                        ResultPartitionID partitionId,
                        ConnectionID connectionId,
-                       ConnectionManager connectionManager) {
+                       ConnectionManager connectionManager,
+                       IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, connectionId, 
connectionManager,
-                               new Tuple2<Integer, Integer>(0, 0));
+                               new Tuple2<Integer, Integer>(0, 0), metrics);
        }
 
        public RemoteInputChannel(
@@ -92,9 +94,10 @@ public class RemoteInputChannel extends InputChannel {
                        ResultPartitionID partitionId,
                        ConnectionID connectionId,
                        ConnectionManager connectionManager,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff) {
+                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       IOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff);
+               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
 
                this.connectionId = checkNotNull(connectionId);
                this.connectionManager = checkNotNull(connectionManager);
@@ -148,6 +151,7 @@ public class RemoteInputChannel extends InputChannel {
                                throw new IOException("Queried input channel 
for data although non is available.");
                        }
 
+                       numBytesIn.inc(buffer.getSize());
                        return buffer;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 81d202a..90e395c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -173,7 +174,8 @@ public class SingleInputGate implements InputGate {
                        IntermediateDataSetID consumedResultId,
                        int consumedSubpartitionIndex,
                        int numberOfInputChannels,
-                       PartitionStateChecker partitionStateChecker) {
+                       PartitionStateChecker partitionStateChecker,
+                       IOMetricGroup metrics) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                this.jobId = checkNotNull(jobId);
@@ -502,7 +504,8 @@ public class SingleInputGate implements InputGate {
                        JobID jobId,
                        ExecutionAttemptID executionId,
                        InputGateDeploymentDescriptor igdd,
-                       NetworkEnvironment networkEnvironment) {
+                       NetworkEnvironment networkEnvironment,
+                       IOMetricGroup metrics) {
 
                final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
 
@@ -512,7 +515,8 @@ public class SingleInputGate implements InputGate {
                final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
                final SingleInputGate inputGate = new SingleInputGate(
-                               owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex, icdd.length, 
networkEnvironment.getPartitionStateChecker());
+                               owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex,
+                               icdd.length, 
networkEnvironment.getPartitionStateChecker(), metrics);
 
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
@@ -526,13 +530,16 @@ public class SingleInputGate implements InputGate {
                                inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
                                                
networkEnvironment.getPartitionManager(),
                                                
networkEnvironment.getTaskEventDispatcher(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
+                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+                                               metrics
+                               );
                        }
                        else if (partitionLocation.isRemote()) {
                                inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
                                                
partitionLocation.getConnectionId(),
                                                
networkEnvironment.getConnectionManager(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+                                               metrics
                                );
                        }
                        else if (partitionLocation.isUnknown()) {
@@ -540,7 +547,8 @@ public class SingleInputGate implements InputGate {
                                                
networkEnvironment.getPartitionManager(),
                                                
networkEnvironment.getTaskEventDispatcher(),
                                                
networkEnvironment.getConnectionManager(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+                                               metrics
                                );
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 015f3fa..840c805 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -47,6 +48,8 @@ public class UnknownInputChannel extends InputChannel {
        /** Initial and maximum backoff (in ms) after failed partition 
requests. */
        private final Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff;
 
+       private final IOMetricGroup metrics;
+
        public UnknownInputChannel(
                        SingleInputGate gate,
                        int channelIndex,
@@ -54,14 +57,16 @@ public class UnknownInputChannel extends InputChannel {
                        ResultPartitionManager partitionManager,
                        TaskEventDispatcher taskEventDispatcher,
                        ConnectionManager connectionManager,
-                       Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff) {
+                       Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff,
+                       IOMetricGroup metrics) {
 
-               super(gate, channelIndex, partitionId, 
partitionRequestInitialAndMaxBackoff);
+               super(gate, channelIndex, partitionId, 
partitionRequestInitialAndMaxBackoff, null);
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
                this.connectionManager = checkNotNull(connectionManager);
                this.partitionRequestInitialAndMaxBackoff = 
checkNotNull(partitionRequestInitialAndMaxBackoff);
+               this.metrics = checkNotNull(metrics);
        }
 
        @Override
@@ -112,10 +117,10 @@ public class UnknownInputChannel extends InputChannel {
        // 
------------------------------------------------------------------------
 
        public RemoteInputChannel toRemoteInputChannel(ConnectionID 
producerAddress) {
-               return new RemoteInputChannel(inputGate, channelIndex, 
partitionId, checkNotNull(producerAddress), connectionManager, 
partitionRequestInitialAndMaxBackoff);
+               return new RemoteInputChannel(inputGate, channelIndex, 
partitionId, checkNotNull(producerAddress), connectionManager, 
partitionRequestInitialAndMaxBackoff, metrics);
        }
 
        public LocalInputChannel toLocalInputChannel() {
-               return new LocalInputChannel(inputGate, channelIndex, 
partitionId, partitionManager, taskEventDispatcher, 
partitionRequestInitialAndMaxBackoff);
+               return new LocalInputChannel(inputGate, channelIndex, 
partitionId, partitionManager, taskEventDispatcher, 
partitionRequestInitialAndMaxBackoff, metrics);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index e034dd6..406d430 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.metrics.Counter;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -63,13 +66,15 @@ public abstract class 
AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
        @Override
        public void initialize() throws Exception {
                TaskConfig config = this.taskContext.getTaskConfig();
+
+               final Counter numRecordsIn = 
taskContext.getMetricGroup().counter("numRecordsIn");
                
                TypeSerializer<IT1> serializer1 = 
this.taskContext.<IT1>getInputSerializer(0).getSerializer();
                TypeSerializer<IT2> serializer2 = 
this.taskContext.<IT2>getInputSerializer(1).getSerializer();
                TypeComparator<IT1> comparator1 = 
this.taskContext.getDriverComparator(0);
                TypeComparator<IT2> comparator2 = 
this.taskContext.getDriverComparator(1);
-               MutableObjectIterator<IT1> input1 = 
this.taskContext.getInput(0);
-               MutableObjectIterator<IT2> input2 = 
this.taskContext.getInput(1);
+               MutableObjectIterator<IT1> input1 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), 
numRecordsIn);
+               MutableObjectIterator<IT2> input2 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), 
numRecordsIn);
 
                TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = 
                                
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
@@ -164,8 +169,9 @@ public abstract class 
AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsOut = 
taskContext.getMetricGroup().counter("numRecordsOut");
                final FlatJoinFunction<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                
                while (this.running && matchIterator != null && 
matchIterator.callWithNextKey(matchStub, collector));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 2589ca5..a28e27e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -23,10 +23,13 @@ import 
org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -84,9 +87,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> 
implements Driver<Fl
                final double driverMemFraction = 
config.getRelativeMemoryDriver();
                
                final DriverStrategy ls = config.getDriverStrategy();
-               
-               final MutableObjectIterator<IT1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<IT2> in2 = 
this.taskContext.getInput(1);
+
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final MutableObjectIterator<IT1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), 
numRecordsIn);
+               final MutableObjectIterator<IT2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), 
numRecordsIn);
                
                // get serializers and comparators
                final TypeSerializer<IT1> serializer1 = 
this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -147,8 +151,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, 
OT> implements Driver<Fl
        
        @Override
        public void run() throws Exception {
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
+               
                final FlatJoinFunction<IT1, IT2, OT> joinStub = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                final JoinTaskIterator<IT1, IT2, OT> outerJoinIterator = 
this.outerJoinIterator;
                
                while (this.running && 
outerJoinIterator.callWithNextKey(joinStub, collector)) ;

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 0c8dc34..f0673c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.util.Collector;
@@ -91,12 +94,15 @@ public class AllGroupCombineDriver<IN, OUT> implements 
Driver<GroupCombineFuncti
                        LOG.debug("AllGroupCombine starting.");
                }
 
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
+
                final TypeSerializerFactory<IN> serializerFactory = 
this.taskContext.getInputSerializer(0);
                TypeSerializer<IN> serializer = 
serializerFactory.getSerializer();
 
-               final MutableObjectIterator<IN> in = 
this.taskContext.getInput(0);
+               final MutableObjectIterator<IN> in = new 
CountingMutableObjectIterator<>(this.taskContext.<IN>getInput(0), numRecordsIn);
                final GroupCombineFunction<IN, OUT> reducer = 
this.taskContext.getStub();
-               final Collector<OUT> output = 
this.taskContext.getOutputCollector();
+               final Collector<OUT> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        final ReusingMutableToRegularIteratorWrapper<IN> inIter 
= new ReusingMutableToRegularIteratorWrapper<IN>(in, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index e8545e7..13d7222 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -20,6 +20,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -104,14 +107,19 @@ public class AllReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                        LOG.debug(this.taskContext.formatLogString("AllReduce 
preprocessing done. Running Reducer code."));
                }
 
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
+
                final ReduceFunction<T> stub = this.taskContext.getStub();
                final MutableObjectIterator<T> input = this.input;
                final TypeSerializer<T> serializer = this.serializer;
+               final Collector<T> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                T val1;
                if ((val1 = input.next()) == null) {
                        return;
                }
+               numRecordsIn.inc();
 
                if (objectReuseEnabled) {
                        // We only need two objects. The first reference stores 
results and is
@@ -121,6 +129,7 @@ public class AllReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                        T value = val1;
 
                        while (running && (val2 = input.next(val2)) != null) {
+                               numRecordsIn.inc();
                                value = stub.reduce(value, val2);
 
                                // we must never read into the object returned
@@ -132,14 +141,15 @@ public class AllReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                                }
                        }
 
-                       this.taskContext.getOutputCollector().collect(value);
+                       collector.collect(value);
                } else {
                        T val2;
                        while (running && (val2 = input.next()) != null) {
+                               numRecordsIn.inc();
                                val1 = stub.reduce(val1, val2);
                        }
 
-                       this.taskContext.getOutputCollector().collect(val1);
+                       collector.collect(val1);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index f38b988..36965ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -676,7 +676,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                        }
 
                        inputReaders[i].setReporter(reporter);
-                       
inputReaders[i].setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
 
                        currentReaderOffset += groupSize;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 665ab0e..43a913d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -20,7 +20,10 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
 import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -93,9 +96,11 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
Driver<CoGroupFunction<IT1,
                if (config.getDriverStrategy() != DriverStrategy.CO_GROUP) {
                        throw new Exception("Unrecognized driver strategy for 
CoGoup driver: " + config.getDriverStrategy().name());
                }
+
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
                
-               final MutableObjectIterator<IT1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<IT2> in2 = 
this.taskContext.getInput(1);
+               final MutableObjectIterator<IT1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), 
numRecordsIn);
+               final MutableObjectIterator<IT2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), 
numRecordsIn);
                
                // get the key positions and types
                final TypeSerializer<IT1> serializer1 = 
this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -144,8 +149,10 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
Driver<CoGroupFunction<IT1,
        @Override
        public void run() throws Exception
        {
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
+
                final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                final CoGroupTaskIterator<IT1, IT2> coGroupIterator = 
this.coGroupIterator;
                
                while (this.running && coGroupIterator.next()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index c9d84b1..3e1d01f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -20,6 +20,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -194,9 +197,12 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                        LOG.debug(this.taskContext.formatLogString("Running 
Cross with Block-Nested-Loops: " +
                                        "First input is outer (blocking) side, 
second input is inner (spilling) side."));
                }
-                       
-               final MutableObjectIterator<T1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<T2> in2 = 
this.taskContext.getInput(1);
+
+               final Counter numRecordsIn = 
taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
taskContext.getMetricGroup().counter("numRecordsOut");
+
+               final MutableObjectIterator<T1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+               final MutableObjectIterator<T2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
                
                final TypeSerializer<T1> serializer1 = 
this.taskContext.<T1>getInputSerializer(0).getSerializer();
                final TypeSerializer<T2> serializer2 = 
this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -213,7 +219,7 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                
 
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 
                if (objectReuseEnabled) {
@@ -259,9 +265,12 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                        LOG.debug(this.taskContext.formatLogString("Running 
Cross with Block-Nested-Loops: " +
                                        "First input is inner (spilling) side, 
second input is outer (blocking) side."));
                }
-               
-               final MutableObjectIterator<T1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<T2> in2 = 
this.taskContext.getInput(1);
+
+               final Counter numRecordsIn = 
taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
taskContext.getMetricGroup().counter("numRecordsOut");
+
+               final MutableObjectIterator<T1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+               final MutableObjectIterator<T2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
                
                final TypeSerializer<T1> serializer1 = 
this.taskContext.<T1>getInputSerializer(0).getSerializer();
                final TypeSerializer<T2> serializer2 = 
this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -277,7 +286,7 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                this.blockIter = blockVals;
                
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        final T1 val1Reuse = serializer1.createInstance();
@@ -322,9 +331,12 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                        LOG.debug(this.taskContext.formatLogString("Running 
Cross with Nested-Loops: " +
                                        "First input is outer side, second 
input is inner (spilling) side."));
                }
-               
-               final MutableObjectIterator<T1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<T2> in2 = 
this.taskContext.getInput(1);
+
+               final Counter numRecordsIn = 
taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
taskContext.getMetricGroup().counter("numRecordsOut");
+
+               final MutableObjectIterator<T1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+               final MutableObjectIterator<T2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
                
                final TypeSerializer<T1> serializer1 = 
this.taskContext.<T1>getInputSerializer(0).getSerializer();
                final TypeSerializer<T2> serializer2 = 
this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -335,7 +347,7 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                this.spillIter = spillVals;
                
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        final T1 val1Reuse = serializer1.createInstance();
@@ -372,8 +384,12 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                        LOG.debug(this.taskContext.formatLogString("Running 
Cross with Nested-Loops: " +
                                        "First input is inner (spilling) side, 
second input is outer side."));
                }
-               final MutableObjectIterator<T1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<T2> in2 = 
this.taskContext.getInput(1);
+
+               final Counter numRecordsIn = 
taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
taskContext.getMetricGroup().counter("numRecordsOut");
+
+               final MutableObjectIterator<T1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+               final MutableObjectIterator<T2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
                
                final TypeSerializer<T1> serializer1 = 
this.taskContext.<T1>getInputSerializer(0).getSerializer();
                final TypeSerializer<T2> serializer2 = 
this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -384,7 +400,7 @@ public class CrossDriver<T1, T2, OT> implements 
Driver<CrossFunction<T1, T2, OT>
                this.spillIter = spillVals;
 
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        final T1 val1Reuse = serializer1.createInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 380edd4..b73c85a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.io.RichOutputFormat;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -104,8 +106,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                // 
--------------------------------------------------------------------
                LOG.debug(getLogString("Starting data sink operator"));
 
+               RuntimeContext ctx = createRuntimeContext();
+               final Counter numRecordsIn = 
ctx.getMetricGroup().counter("numRecordsIn");
+               
                
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
-                       ((RichOutputFormat) 
this.format).setRuntimeContext(createRuntimeContext());
+                       ((RichOutputFormat) this.format).setRuntimeContext(ctx);
                        LOG.debug(getLogString("Rich Sink detected. 
Initializing runtime context."));
                }
 
@@ -174,6 +179,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
                                // work!
                                while (!this.taskCanceled && ((record = 
input.next(record)) != null)) {
+                                       numRecordsIn.inc();
                                        format.writeRecord(record);
                                }
                        } else {
@@ -181,6 +187,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
                                // work!
                                while (!this.taskCanceled && ((record = 
input.next()) != null)) {
+                                       numRecordsIn.inc();
                                        format.writeRecord(record);
                                }
                        }
@@ -349,8 +356,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
                inputReader.setReporter(reporter);
                
-               
inputReader.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
-
                this.inputTypeSerializerFactory = 
this.config.getInputSerializer(0, getUserCodeClassLoader());
                @SuppressWarnings({ "rawtypes" })
                final MutableObjectIterator<?> iter = new 
ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 819b84f..c57f133 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -35,6 +37,7 @@ import 
org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,8 +100,12 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                // 
--------------------------------------------------------------------
                LOG.debug(getLogString("Starting data source operator"));
 
+               RuntimeContext ctx = createRuntimeContext();
+               Counter splitCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
+               Counter numRecordsOut = 
ctx.getMetricGroup().counter("numRecordsOut");
+
                if 
(RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
-                       ((RichInputFormat) 
this.format).setRuntimeContext(createRuntimeContext());
+                       ((RichInputFormat) this.format).setRuntimeContext(ctx);
                        LOG.debug(getLogString("Rich Source detected. 
Initializing runtime context."));
                        ((RichInputFormat) this.format).openInputFormat();
                        LOG.debug(getLogString("Rich Source detected. Opening 
the InputFormat."));
@@ -135,7 +142,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                                LOG.debug(getLogString("Starting to read input 
from split " + split.toString()));
                                
                                try {
-                                       final Collector<OT> output = 
this.output;
+                                       final Collector<OT> output = new 
CountingCollector<>(this.output, numRecordsOut);
 
                                        if (objectReuseEnabled) {
                                                OT reuse = 
serializer.createInstance();
@@ -165,6 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                                        // close. We close here such that a 
regular close throwing an exception marks a task as failed.
                                        format.close();
                                }
+                               splitCounter.inc();
                        } // end for all input splits
 
                        // close the collector. if it is a chaining task 
collector, it will close its chained tasks

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index c29923b..5b4a6ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -83,22 +85,26 @@ public class FlatMapDriver<IT, OT> implements 
Driver<FlatMapFunction<IT, OT>, OT
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                // cache references on the stack
                final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
                final FlatMapFunction<IT, OT> function = 
this.taskContext.getStub();
-               final Collector<OT> output = 
this.taskContext.getOutputCollector();
+               final Collector<OT> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
 
 
                        while (this.running && ((record = input.next(record)) 
!= null)) {
+                               numRecordsIn.inc();
                                function.flatMap(record, output);
                        }
                } else {
                        IT record;
 
                        while (this.running && ((record = input.next()) != 
null)) {
+                               numRecordsIn.inc();
                                function.flatMap(record, output);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index d6825ac..ccd88ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -89,9 +92,11 @@ public class GroupReduceDriver<IT, OT> implements 
Driver<GroupReduceFunction<IT,
                if (config.getDriverStrategy() != 
DriverStrategy.SORTED_GROUP_REDUCE) {
                        throw new Exception("Unrecognized driver strategy for 
GroupReduce driver: " + config.getDriverStrategy().name());
                }
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               
                this.serializer = 
this.taskContext.<IT>getInputSerializer(0).getSerializer();
                this.comparator = this.taskContext.getDriverComparator(0);
-               this.input = this.taskContext.getInput(0);
+               this.input = new 
CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
 
                ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
                this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
@@ -106,10 +111,11 @@ public class GroupReduceDriver<IT, OT> implements 
Driver<GroupReduceFunction<IT,
                if (LOG.isDebugEnabled()) {
                        
LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. 
Running GroupReducer code."));
                }
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
 
                // cache references on the stack
                final GroupReduceFunction<IT, OT> stub = 
this.taskContext.getStub();
-               final Collector<OT> output = 
this.taskContext.getOutputCollector();
+               final Collector<OT> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                
                if (objectReuseEnabled) {
                        final ReusingKeyGroupedIterator<IT> iter = new 
ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index f7ad8d1..efb59a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
@@ -34,6 +35,8 @@ import 
org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -84,6 +87,8 @@ public class JoinDriver<IT1, IT2, OT> implements 
Driver<FlatJoinFunction<IT1, IT
        @Override
        public void prepare() throws Exception{
                final TaskConfig config = this.taskContext.getTaskConfig();
+
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
                
                // obtain task manager's memory manager and I/O manager
                final MemoryManager memoryManager = 
this.taskContext.getMemoryManager();
@@ -96,8 +101,8 @@ public class JoinDriver<IT1, IT2, OT> implements 
Driver<FlatJoinFunction<IT1, IT
                // test minimum memory requirements
                final DriverStrategy ls = config.getDriverStrategy();
                
-               final MutableObjectIterator<IT1> in1 = 
this.taskContext.getInput(0);
-               final MutableObjectIterator<IT2> in2 = 
this.taskContext.getInput(1);
+               final MutableObjectIterator<IT1> in1 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), 
numRecordsIn);
+               final MutableObjectIterator<IT2> in2 = new 
CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), 
numRecordsIn);
 
                // get the key positions and types
                final TypeSerializer<IT1> serializer1 = 
this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -209,8 +214,9 @@ public class JoinDriver<IT1, IT2, OT> implements 
Driver<FlatJoinFunction<IT1, IT
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                final FlatJoinFunction<IT1, IT2, OT> joinStub = 
this.taskContext.getStub();
-               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final Collector<OT> collector = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                final JoinTaskIterator<IT1, IT2, OT> joinIterator = 
this.joinIterator;
                
                while (this.running && joinIterator.callWithNextKey(joinStub, 
collector));

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index eefe8e4..65f9061 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -78,15 +80,18 @@ public class MapDriver<IT, OT> implements 
Driver<MapFunction<IT, OT>, OT> {
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                // cache references on the stack
                final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
                final MapFunction<IT, OT> function = this.taskContext.getStub();
-               final Collector<OT> output = 
this.taskContext.getOutputCollector();
+               final Collector<OT> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
        
                        while (this.running && ((record = input.next(record)) 
!= null)) {
+                               numRecordsIn.inc();
                                output.collect(function.map(record));
                        }
                }
@@ -94,6 +99,7 @@ public class MapDriver<IT, OT> implements 
Driver<MapFunction<IT, OT>, OT> {
                        IT record = null;
                        
                        while (this.running && ((record = input.next()) != 
null)) {
+                               numRecordsIn.inc();
                                output.collect(function.map(record));
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index 8f245f0..3496e14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.util.Collector;
@@ -83,10 +86,12 @@ public class MapPartitionDriver<IT, OT> implements 
Driver<MapPartitionFunction<I
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                // cache references on the stack
-               final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
+               final MutableObjectIterator<IT> input = new 
CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
                final MapPartitionFunction<IT, OT> function = 
this.taskContext.getStub();
-               final Collector<OT> output = 
this.taskContext.getOutputCollector();
+               final Collector<OT> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        final ReusingMutableToRegularIteratorWrapper<IT> inIter 
= new ReusingMutableToRegularIteratorWrapper<IT>(input, 
this.taskContext.<IT>getInputSerializer(0).getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
index 9b08fad..802227a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
@@ -61,6 +61,7 @@ public class NoOpChainedDriver<IT> extends ChainedDriver<IT, 
IT> {
        @Override
        public void collect(IT record) {
                try {
+                       this.numRecordsIn.inc();
                        this.outputCollector.collect(record);
                } catch (Exception ex) {
                        throw new 
ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 073a837..dd64b76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -75,18 +77,22 @@ public class NoOpDriver<T> implements 
Driver<AbstractRichFunction, T> {
        @Override
        public void run() throws Exception {
                // cache references on the stack
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                final MutableObjectIterator<T> input = 
this.taskContext.getInput(0);
-               final Collector<T> output = 
this.taskContext.getOutputCollector();
+               final Collector<T> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        T record = 
this.taskContext.<T>getInputSerializer(0).getSerializer().createInstance();
 
                        while (this.running && ((record = input.next(record)) 
!= null)) {
+                               numRecordsIn.inc();
                                output.collect(record);
                        }
                } else {
                        T record;
                        while (this.running && ((record = input.next()) != 
null)) {
+                               numRecordsIn.inc();
                                output.collect(record);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 1ceeaf0..e1ce39e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -104,13 +106,15 @@ public class ReduceCombineDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                if (this.taskContext.getTaskConfig().getDriverStrategy() != 
DriverStrategy.SORTED_PARTIAL_REDUCE) {
                        throw new Exception("Invalid strategy " + 
this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
                }
+
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                
                // instantiate the serializer / comparator
                final TypeSerializerFactory<T> serializerFactory = 
this.taskContext.getInputSerializer(0);
                this.comparator = this.taskContext.getDriverComparator(0);
                this.serializer = serializerFactory.getSerializer();
                this.reducer = this.taskContext.getStub();
-               this.output = this.taskContext.getOutputCollector();
+               this.output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                MemoryManager memManager = this.taskContext.getMemoryManager();
                final int numMemoryPages = memManager.computeNumberOfPages(
@@ -140,6 +144,8 @@ public class ReduceCombineDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                        LOG.debug("Combiner starting.");
                }
                
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               
                final MutableObjectIterator<T> in = 
this.taskContext.getInput(0);
                final TypeSerializer<T> serializer = this.serializer;
                
@@ -147,6 +153,7 @@ public class ReduceCombineDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                        T value = serializer.createInstance();
                
                        while (running && (value = in.next(value)) != null) {
+                               numRecordsIn.inc();
                                
                                // try writing to the sorter first
                                if (this.sorter.write(value)) {
@@ -166,6 +173,7 @@ public class ReduceCombineDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                else {
                        T value;
                        while (running && (value = in.next()) != null) {
+                               numRecordsIn.inc();
 
                                // try writing to the sorter first
                                if (this.sorter.write(value)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 3b7af6e..eb4f2f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -20,6 +20,8 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -106,6 +108,9 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                        LOG.debug(this.taskContext.formatLogString("Reducer 
preprocessing done. Running Reducer code."));
                }
 
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
+
                // cache references on the stack
                final MutableObjectIterator<T> input = this.input;
                final TypeSerializer<T> serializer = this.serializer;
@@ -113,7 +118,7 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                
                final ReduceFunction<T> function = this.taskContext.getStub();
                
-               final Collector<T> output = 
this.taskContext.getOutputCollector();
+               final Collector<T> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
                if (objectReuseEnabled) {
                        // We only need two objects. The first reference stores 
results and is
@@ -128,10 +133,12 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
 
                        // iterate over key groups
                        while (this.running && value != null) {
+                               numRecordsIn.inc();
                                comparator.setReference(value);
 
                                // iterate within a key group
                                while ((reuse2 = input.next(reuse2)) != null) {
+                                       numRecordsIn.inc();
                                        if 
(comparator.equalToReference(reuse2)) {
                                                // same group, reduce
                                                value = function.reduce(value, 
reuse2);
@@ -163,11 +170,13 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
 
                        // iterate over key groups
                        while (this.running && value != null) {
+                               numRecordsIn.inc();
                                comparator.setReference(value);
                                T res = value;
 
                                // iterate within a key group
                                while ((value = input.next()) != null) {
+                                       numRecordsIn.inc();
                                        if (comparator.equalToReference(value)) 
{
                                                // same group, reduce
                                                res = function.reduce(res, 
value);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 4791761..3d52925 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -58,18 +60,22 @@ public class UnionWithTempOperator<T> implements 
Driver<Function, T> {
 
        @Override
        public void run() throws Exception {
+               final Counter numRecordsIn = 
this.taskContext.getMetricGroup().counter("numRecordsIn");
+               final Counter numRecordsOut = 
this.taskContext.getMetricGroup().counter("numRecordsOut");
                
-               final Collector<T> output = 
this.taskContext.getOutputCollector();
+               final Collector<T> output = new 
CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
                T reuse = 
this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
                T record;
                
                final MutableObjectIterator<T> input = 
this.taskContext.getInput(STREAMED_INPUT);
                while (this.running && ((record = input.next(reuse)) != null)) {
+                       numRecordsIn.inc();
                        output.collect(record);
                }
                
                final MutableObjectIterator<T> cache = 
this.taskContext.getInput(CACHED_INPUT);
                while (this.running && ((record = cache.next(reuse)) != null)) {
+                       numRecordsIn.inc();
                        output.collect(record);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
index 46ee41b..1e3482f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -86,6 +86,7 @@ public class ChainedAllReduceDriver<IT> extends 
ChainedDriver<IT, IT> {
        // 
--------------------------------------------------------------------------------------------
        @Override
        public void collect(IT record) {
+               numRecordsIn.inc();
                try {
                        if (base == null) {
                                base = objectReuseEnabled ? record : 
serializer.copy(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 407716f..2560135 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 
 import java.util.Map;
@@ -53,6 +55,10 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
        protected boolean objectReuseEnabled = false;
        
        protected MetricGroup metrics;
+       
+       protected Counter numRecordsIn;
+       
+       protected Counter numRecordsOut;
 
        
        public void setup(TaskConfig config, String taskName, Collector<OT> 
outputCollector,
@@ -61,9 +67,11 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
        {
                this.config = config;
                this.taskName = taskName;
-               this.outputCollector = outputCollector;
                this.userCodeClassLoader = userCodeClassLoader;
                this.metrics = 
parent.getEnvironment().getMetricGroup().addOperator(taskName);
+               this.numRecordsIn = this.metrics.counter("numRecordsIn");
+               this.numRecordsOut = this.metrics.counter("numRecordsOut");
+               this.outputCollector = new CountingCollector<>(outputCollector, 
numRecordsOut);
 
                Environment env = parent.getEnvironment();
 
@@ -103,7 +111,7 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
 
        @SuppressWarnings("unchecked")
        public void setOutputCollector(Collector<?> outputCollector) {
-               this.outputCollector = (Collector<OT>) outputCollector;
+               this.outputCollector = new CountingCollector<>((Collector<OT>) 
outputCollector, numRecordsOut);
        }
 
        public Collector<OT> getOutputCollector() {

Reply via email to