TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object 
reuse
URL: https://github.com/apache/flink/pull/6643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/execution_configuration.md 
b/docs/dev/execution_configuration.md
index f0103b0f39f..3991ab59ac5 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an 
anonymous user functi
 
 - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by 
default. Forces the Flink AvroTypeInformation to use the Avro serializer 
instead of Kryo for serializing Avro POJOs.
 
-- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-code function of an operation is not aware of this 
behavior.
+- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-defined function of an operation is not aware of this 
behavior.
 
 - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status 
updates are printed to `System.out` by default. This setting allows to disable 
this behavior.
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..d36fd296562 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() {
 
        /**
         * Enables reusing objects that Flink internally uses for 
deserialization and passing
-        * data to user-code functions. Keep in mind that this can lead to bugs 
when the
-        * user-code function of an operation is not aware of this behaviour.
+        * data to user-defined functions. Keep in mind that this can lead to 
bugs when the
+        * user-defined function of an operation is not aware of this behaviour.
         */
        public ExecutionConfig enableObjectReuse() {
                objectReuse = true;
@@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() {
 
        /**
         * Disables reusing objects that Flink internally uses for 
deserialization and passing
-        * data to user-code functions. @see #enableObjectReuse()
+        * data to user-defined functions. @see #enableObjectReuse()
         */
        public ExecutionConfig disableObjectReuse() {
                objectReuse = false;
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d8ba29ae478..f4185f4c129 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -51,7 +51,7 @@ class TableSinkITCase(
     val input = CollectionDataSets.get3TupleDataSet(env)
       .map(x => x).setParallelism(4) // increase DOP to 4
 
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
+    input.toTable(tEnv, 'a, 'b, 'c)
       .where('a < 5 || 'a > 17)
       .select('c, 'b)
       .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 70e59f3d24d..95cb1df1b04 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table
 import java.io.File
 import java.lang.{Boolean => JBool}
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -54,7 +56,7 @@ class TableSinkITCase extends AbstractTestBase {
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSourceSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear()
 
     val input = StreamTestData.get3TupleDataStream(env)
       .assignAscendingTimestamps(r => r._2)
@@ -560,8 +562,7 @@ private[flink] class TestAppendSink extends 
AppendStreamTableSink[Row] {
   var fTypes: Array[TypeInformation[_]] = _
 
   override def emitDataStream(s: DataStream[Row]): Unit = {
-    s.map(
-      new MapFunction[Row, JTuple2[JBool, Row]] {
+    s.map(new MapFunction[Row, JTuple2[JBool, Row]] {
         override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, 
value)
       })
       .addSink(new RowSink)
@@ -661,12 +662,11 @@ object RowCollector {
     new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
 
   def addValue(value: JTuple2[JBool, Row]): Unit = {
+    // make a deep copy
+    val copy = new JTuple2[JBool, Row](value.f0,
+      new KryoSerializer(classOf[Row], new ExecutionConfig()).copy(value.f1))
 
-    // make a copy
-    val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
-    sink.synchronized {
-      sink += copy
-    }
+    sink.synchronized { sink += copy }
   }
 
   def getAndClearValues: List[JTuple2[JBool, Row]] = {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index a9c64b5f6fe..769605ba704 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -36,6 +36,7 @@
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -109,6 +110,8 @@
 
        private boolean isFinished;
 
+       private IN reusedObject;
+
        @SuppressWarnings("unchecked")
        public StreamInputProcessor(
                        InputGate[] inputGates,
@@ -121,7 +124,8 @@ public StreamInputProcessor(
                        StreamStatusMaintainer streamStatusMaintainer,
                        OneInputStreamOperator<IN, ?> streamOperator,
                        TaskIOMetricGroup metrics,
-                       WatermarkGauge watermarkGauge) throws IOException {
+                       WatermarkGauge watermarkGauge,
+                       boolean objectReuse) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -131,7 +135,10 @@ public StreamInputProcessor(
                this.lock = checkNotNull(lock);
 
                StreamElementSerializer<IN> ser = new 
StreamElementSerializer<>(inputSerializer);
-               this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(ser);
+               this.deserializationDelegate = objectReuse ?
+                       new ReusingDeserializationDelegate<>(ser) :
+                       new NonReusingDeserializationDelegate<>(ser);
+               this.reusedObject = objectReuse ? 
inputSerializer.createInstance() : null;
 
                // Initialize one deserializer per input channel
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
@@ -169,6 +176,10 @@ public boolean processInput() throws Exception {
 
                while (true) {
                        if (currentRecordDeserializer != null) {
+                               if (reusedObject != null){
+                                       deserializationDelegate.setInstance(new 
StreamRecord<>(reusedObject));
+                               }
+
                                DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);
 
                                if (result.isBufferConsumed()) {
@@ -194,6 +205,8 @@ public boolean processInput() throws Exception {
                                                }
                                                continue;
                                        } else {
+                                               reusedObject = 
((StreamRecord<IN>) recordOrMark).getValue();
+
                                                // now we can do the actual 
processing
                                                StreamRecord<IN> record = 
recordOrMark.asRecord();
                                                synchronized (lock) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ab4f90dcf23..531d6cbf28c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -36,6 +36,7 @@
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -125,6 +126,9 @@
 
        private boolean isFinished;
 
+       private IN1 reusedObject1;
+       private IN2 reusedObject2;
+
        @SuppressWarnings("unchecked")
        public StreamTwoInputProcessor(
                        Collection<InputGate> inputGates1,
@@ -140,7 +144,8 @@ public StreamTwoInputProcessor(
                        TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
                        TaskIOMetricGroup metrics,
                        WatermarkGauge input1WatermarkGauge,
-                       WatermarkGauge input2WatermarkGauge) throws IOException 
{
+                       WatermarkGauge input2WatermarkGauge,
+                       boolean objectReuse) throws IOException {
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -150,10 +155,17 @@ public StreamTwoInputProcessor(
                this.lock = checkNotNull(lock);
 
                StreamElementSerializer<IN1> ser1 = new 
StreamElementSerializer<>(inputSerializer1);
-               this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate<>(ser1);
+               this.deserializationDelegate1 = objectReuse ?
+                       new ReusingDeserializationDelegate<>(ser1) :
+                       new NonReusingDeserializationDelegate<>(ser1);
+               this.reusedObject1 = objectReuse ? 
inputSerializer1.createInstance() : null;
 
                StreamElementSerializer<IN2> ser2 = new 
StreamElementSerializer<>(inputSerializer2);
-               this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate<>(ser2);
+               this.deserializationDelegate2 = objectReuse ?
+                       new ReusingDeserializationDelegate<>(ser2) :
+                       new NonReusingDeserializationDelegate<>(ser2);
+
+               this.reusedObject2 = objectReuse ? 
inputSerializer2.createInstance() : null;
 
                // Initialize one deserializer per input channel
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
@@ -203,8 +215,14 @@ public boolean processInput() throws Exception {
                        if (currentRecordDeserializer != null) {
                                DeserializationResult result;
                                if (currentChannel < numInputChannels1) {
+                                       if (reusedObject1 != null){
+                                               
deserializationDelegate1.setInstance(new StreamRecord<>(reusedObject1));
+                                       }
                                        result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate1);
                                } else {
+                                       if (reusedObject2 != null){
+                                               
deserializationDelegate2.setInstance(new StreamRecord<>(reusedObject2));
+                                       }
                                        result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate2);
                                }
 
@@ -231,6 +249,8 @@ else if (recordOrWatermark.isLatencyMarker()) {
                                                        continue;
                                                }
                                                else {
+                                                       reusedObject1 = 
((StreamRecord<IN1>) recordOrWatermark).getValue();
+
                                                        StreamRecord<IN1> 
record = recordOrWatermark.asRecord();
                                                        synchronized (lock) {
                                                                
numRecordsIn.inc();
@@ -258,6 +278,8 @@ else if (recordOrWatermark.isLatencyMarker()) {
                                                        continue;
                                                }
                                                else {
+                                                       reusedObject2 = 
((StreamRecord<IN2>) recordOrWatermark).getValue();
+
                                                        StreamRecord<IN2> 
record = recordOrWatermark.asRecord();
                                                        synchronized (lock) {
                                                                
numRecordsIn.inc();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ed6022ff592..8503eb21d70 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -225,15 +225,15 @@ else if (tag == TAG_LATENCY_MARKER) {
        public StreamElement deserialize(StreamElement reuse, DataInputView 
source) throws IOException {
                int tag = source.readByte();
                if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       long timestamp = source.readLong();
-                       T value = typeSerializer.deserialize(source);
                        StreamRecord<T> reuseRecord = reuse.asRecord();
+                       long timestamp = source.readLong();
+                       T value  = 
typeSerializer.deserialize(reuseRecord.getValue(), source);
                        reuseRecord.replace(value, timestamp);
                        return reuseRecord;
                }
                else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       T value = typeSerializer.deserialize(source);
                        StreamRecord<T> reuseRecord = reuse.asRecord();
+                       T value = 
typeSerializer.deserialize(reuseRecord.getValue(), source);
                        reuseRecord.replace(value);
                        return reuseRecord;
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 74985186836..91671be0a5c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -90,7 +90,8 @@ public void init() throws Exception {
                                        getStreamStatusMaintainer(),
                                        this.headOperator,
                                        
getEnvironment().getMetricGroup().getIOMetricGroup(),
-                                       inputWatermarkGauge);
+                                       inputWatermarkGauge,
+                                       
getExecutionConfig().isObjectReuseEnabled());
                }
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
                // wrap watermark gauge since registered metrics must be unique
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 546ccdb3bfc..1199251dd44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -100,7 +100,8 @@ public void init() throws Exception {
                                this.headOperator,
                                
getEnvironment().getMetricGroup().getIOMetricGroup(),
                                input1WatermarkGauge,
-                               input2WatermarkGauge);
+                               input2WatermarkGauge,
+                               getExecutionConfig().isObjectReuseEnabled());
 
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78ed1a..94fdd0fc1b5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -24,9 +24,13 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Counter;
@@ -79,6 +83,7 @@
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -108,20 +113,20 @@ public void testOpenCloseAndTimestamps() throws Exception 
{
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new TestOpenCloseMapFunction());
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
TestOpenCloseMapFunction());
                streamConfig.setStreamOperator(mapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
                long initialTime = 0L;
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processElement(new StreamRecord<String>("Hello", 
initialTime + 1));
-               testHarness.processElement(new StreamRecord<String>("Ciao", 
initialTime + 2));
-               expectedOutput.add(new StreamRecord<String>("Hello", 
initialTime + 1));
-               expectedOutput.add(new StreamRecord<String>("Ciao", initialTime 
+ 2));
+               testHarness.processElement(new StreamRecord<>("Hello", 
initialTime + 1));
+               testHarness.processElement(new StreamRecord<>("Ciao", 
initialTime + 2));
+               expectedOutput.add(new StreamRecord<>("Hello", initialTime + 
1));
+               expectedOutput.add(new StreamRecord<>("Ciao", initialTime + 2));
 
                testHarness.endInput();
 
@@ -152,11 +157,11 @@ public void testWatermarkAndStreamStatusForwarding() 
throws Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
                streamConfig.setStreamOperator(mapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -180,10 +185,10 @@ public void testWatermarkAndStreamStatusForwarding() 
throws Exception {
                        testHarness.getOutput());
 
                // contrary to checkpoint barriers these elements are not 
blocked by watermarks
-               testHarness.processElement(new StreamRecord<String>("Hello", 
initialTime));
-               testHarness.processElement(new StreamRecord<String>("Ciao", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Hello", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao", 
initialTime));
+               testHarness.processElement(new StreamRecord<>("Hello", 
initialTime));
+               testHarness.processElement(new StreamRecord<>("Ciao", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Hello", initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao", initialTime));
 
                testHarness.processElement(new Watermark(initialTime + 4), 0, 
0);
                testHarness.processElement(new Watermark(initialTime + 3), 0, 
1);
@@ -274,7 +279,7 @@ public void testWatermarksNotForwardedWithinChainWhenIdle() 
throws Exception {
 
                // --------------------- begin test ---------------------
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                testHarness.invoke();
                testHarness.waitForTaskRunning();
@@ -364,11 +369,11 @@ public void testCheckpointBarriers() throws Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
                streamConfig.setStreamOperator(mapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -378,16 +383,16 @@ public void testCheckpointBarriers() throws Exception {
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
-               testHarness.processElement(new 
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-               testHarness.processElement(new StreamRecord<String>("Ciao-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Hello-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Ciao-0-0", 
initialTime), 0, 0);
 
                // These elements should be forwarded, since we did not yet 
receive a checkpoint barrier
                // on that input, only add to same input, otherwise we would 
not know the ordering
                // of the output since the Task might read the inputs in any 
order
-               testHarness.processElement(new 
StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
-               testHarness.processElement(new StreamRecord<String>("Ciao-1-1", 
initialTime), 1, 1);
-               expectedOutput.add(new StreamRecord<String>("Hello-1-1", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao-1-1", 
initialTime));
+               testHarness.processElement(new StreamRecord<>("Hello-1-1", 
initialTime), 1, 1);
+               testHarness.processElement(new StreamRecord<>("Ciao-1-1", 
initialTime), 1, 1);
+               expectedOutput.add(new StreamRecord<>("Hello-1-1", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));
 
                testHarness.waitForInputProcessing();
                // we should not yet see the barrier, only the two elements 
from non-blocked input
@@ -401,8 +406,8 @@ public void testCheckpointBarriers() throws Exception {
 
                // now we should see the barrier and after that the buffered 
elements
                expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-               expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Hello-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
 
                testHarness.endInput();
 
@@ -427,11 +432,11 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
                streamConfig.setStreamOperator(mapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -441,16 +446,16 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
-               testHarness.processElement(new 
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-               testHarness.processElement(new StreamRecord<String>("Ciao-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Hello-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Ciao-0-0", 
initialTime), 0, 0);
 
                // These elements should be forwarded, since we did not yet 
receive a checkpoint barrier
                // on that input, only add to same input, otherwise we would 
not know the ordering
                // of the output since the Task might read the inputs in any 
order
-               testHarness.processElement(new 
StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
-               testHarness.processElement(new StreamRecord<String>("Ciao-1-1", 
initialTime), 1, 1);
-               expectedOutput.add(new StreamRecord<String>("Hello-1-1", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao-1-1", 
initialTime));
+               testHarness.processElement(new StreamRecord<>("Hello-1-1", 
initialTime), 1, 1);
+               testHarness.processElement(new StreamRecord<>("Ciao-1-1", 
initialTime), 1, 1);
+               expectedOutput.add(new StreamRecord<>("Hello-1-1", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));
 
                testHarness.waitForInputProcessing();
                // we should not yet see the barrier, only the two elements 
from non-blocked input
@@ -464,8 +469,8 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
                testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
 
                expectedOutput.add(new CancelCheckpointMarker(0));
-               expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Hello-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
                expectedOutput.add(new CheckpointBarrier(1, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                testHarness.waitForInputProcessing();
@@ -759,6 +764,95 @@ public void processWatermark(Watermark mark) throws 
Exception {
                }
        }
 
+       @Test
+       public void testMutableObjectReuse() throws Exception {
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                       OneInputStreamTask::new,
+                       new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                       new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO));
+
+               testHarness.setupOperatorChain(new OperatorID(), new 
TestMutableObjectReuseOperator(true))
+                       .chain(new OperatorID(), new 
TestMutableObjectReuseOperator(),
+                               new TupleSerializer(Tuple2.class,
+                                       new TypeSerializer<?>[]{
+                                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                                               
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())}))
+                       .finish();
+
+               ExecutionConfig executionConfig = 
testHarness.getExecutionConfig();
+               executionConfig.enableObjectReuse();
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+
+               testHarness.processElement(new 
StreamRecord<>(Tuple2.of("Hello", 1)));
+               testHarness.processElement(new 
StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1));
+               testHarness.processElement(new Watermark(initialTime + 1));
+               testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 
1), initialTime + 2));
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 
2), initialTime + 3));
+
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1)));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2), 
initialTime + 1));
+               expectedOutput.add(new Watermark(initialTime + 1));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1), 
initialTime + 2));
+               expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2), 
initialTime + 3));
+
+               testHarness.endInput();
+
+               testHarness.waitForTaskCompletion();
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.",
+                       expectedOutput,
+                       testHarness.getOutput());
+       }
+
+       // This must only be used in one test, otherwise the static fields will 
be changed
+       // by several tests concurrently
+       private static class TestMutableObjectReuseOperator
+               extends AbstractStreamOperator<Tuple2<String, Integer>>
+               implements OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final boolean isHeadOperator;
+               private static Object headOperatorValue;
+
+               private Object prevRecord = null;
+               private Object prevValue = null;
+
+               public TestMutableObjectReuseOperator() {
+                       this.isHeadOperator = false;
+               }
+
+               public TestMutableObjectReuseOperator(boolean isHeadOperator) {
+                       this.isHeadOperator = isHeadOperator;
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<String, 
Integer>> element) throws Exception {
+                       if (isHeadOperator) {
+                               if (prevRecord != null) {
+                                       assertNotEquals("Reuse StreamRecord 
object in the head operator.", prevRecord, element);
+                                       assertEquals("No reuse value object in 
the head operator.", prevValue, element.getValue());
+                               }
+
+                               prevRecord = element;
+                               prevValue = element.getValue();
+
+                               headOperatorValue = element.getValue();
+                       } else {
+                               assertEquals("No reuse value object in chain.", 
headOperatorValue, element.getValue());
+                       }
+
+                       output.collect(element);
+               }
+       }
+
        
//==============================================================================================
        // Utility functions and classes
        
//==============================================================================================
@@ -807,7 +901,7 @@ private void configureChainedTestingStreamOperator(
                                        null
                                ),
                                0,
-                               Collections.<String>emptyList(),
+                               Collections.emptyList(),
                                null,
                                null
                        );
@@ -973,12 +1067,16 @@ protected void handleWatermark(Watermark mark) {
                public void processElement(StreamRecord<String> element) throws 
Exception {
                        output.collect(element);
 
-                       if 
(element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
-                               this.expectForwardedWatermarks = true;
-                       } else if 
(element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
-                               this.expectForwardedWatermarks = false;
-                       } else {
-                               handleElement(element);
+                       switch (element.getValue()) {
+                               case EXPECT_FORWARDED_WATERMARKS_MARKER:
+                                       this.expectForwardedWatermarks = true;
+                                       break;
+                               case NO_FORWARDED_WATERMARKS_MARKER:
+                                       this.expectForwardedWatermarks = false;
+                                       break;
+                               default:
+                                       handleElement(element);
+                                       break;
                        }
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d151573582..e80fdc17154 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,10 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -40,6 +44,7 @@
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -56,6 +61,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
@@ -83,31 +90,31 @@ public void testOpenCloseAndTimestamps() throws Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
+               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<>(new TestOpenCloseMapFunction());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
                long initialTime = 0L;
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                testHarness.invoke();
                testHarness.waitForTaskRunning();
 
-               testHarness.processElement(new StreamRecord<String>("Hello", 
initialTime + 1), 0, 0);
-               expectedOutput.add(new StreamRecord<String>("Hello", 
initialTime + 1));
+               testHarness.processElement(new StreamRecord<>("Hello", 
initialTime + 1), 0, 0);
+               expectedOutput.add(new StreamRecord<>("Hello", initialTime + 
1));
 
                // wait until the input is processed to ensure ordering of the 
output
                testHarness.waitForInputProcessing();
 
-               testHarness.processElement(new StreamRecord<Integer>(1337, 
initialTime + 2), 1, 0);
+               testHarness.processElement(new StreamRecord<>(1337, initialTime 
+ 2), 1, 0);
 
-               expectedOutput.add(new StreamRecord<String>("1337", initialTime 
+ 2));
+               expectedOutput.add(new StreamRecord<>("1337", initialTime + 2));
 
                testHarness.endInput();
 
                testHarness.waitForTaskCompletion();
 
-               Assert.assertTrue("RichFunction methods where not called.", 
TestOpenCloseMapFunction.closeCalled);
+               assertTrue("RichFunction methods where not called.", 
TestOpenCloseMapFunction.closeCalled);
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
        }
@@ -122,7 +129,7 @@ public void testOpenCloseAndTimestamps() throws Exception {
        public void testWatermarkAndStreamStatusForwarding() throws Exception {
 
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
-                       new TwoInputStreamTaskTestHarness<String, Integer, 
String>(
+                       new TwoInputStreamTaskTestHarness<>(
                                TwoInputStreamTask::new,
                                2, 2, new int[] {1, 2},
                                BasicTypeInfo.STRING_TYPE_INFO,
@@ -131,11 +138,11 @@ public void testWatermarkAndStreamStatusForwarding() 
throws Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
+               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<>(new IdentityMap());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -158,10 +165,10 @@ public void testWatermarkAndStreamStatusForwarding() 
throws Exception {
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
                // contrary to checkpoint barriers these elements are not 
blocked by watermarks
-               testHarness.processElement(new StreamRecord<String>("Hello", 
initialTime), 0, 0);
-               testHarness.processElement(new StreamRecord<Integer>(42, 
initialTime), 1, 1);
-               expectedOutput.add(new StreamRecord<String>("Hello", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("42", initialTime));
+               testHarness.processElement(new StreamRecord<>("Hello", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>(42, initialTime), 
1, 1);
+               expectedOutput.add(new StreamRecord<>("Hello", initialTime));
+               expectedOutput.add(new StreamRecord<>("42", initialTime));
 
                testHarness.waitForInputProcessing();
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
@@ -234,18 +241,18 @@ public void testWatermarkAndStreamStatusForwarding() 
throws Exception {
        @SuppressWarnings("unchecked")
        public void testCheckpointBarriers() throws Exception {
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
-                               new TwoInputStreamTaskTestHarness<String, 
Integer, String>(
+                               new TwoInputStreamTaskTestHarness<>(
                                                TwoInputStreamTask::new,
                                                2, 2, new int[] {1, 2},
                                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
+               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<>(new IdentityMap());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -255,21 +262,21 @@ public void testCheckpointBarriers() throws Exception {
 
                // This element should be buffered since we received a 
checkpoint barrier on
                // this input
-               testHarness.processElement(new 
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Hello-0-0", 
initialTime), 0, 0);
 
                // This one should go through
-               testHarness.processElement(new StreamRecord<String>("Ciao-0-0", 
initialTime), 0, 1);
-               expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
+               testHarness.processElement(new StreamRecord<>("Ciao-0-0", 
initialTime), 0, 1);
+               expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
 
                testHarness.waitForInputProcessing();
 
                // These elements should be forwarded, since we did not yet 
receive a checkpoint barrier
                // on that input, only add to same input, otherwise we would 
not know the ordering
                // of the output since the Task might read the inputs in any 
order
-               testHarness.processElement(new StreamRecord<Integer>(11, 
initialTime), 1, 1);
-               testHarness.processElement(new StreamRecord<Integer>(111, 
initialTime), 1, 1);
-               expectedOutput.add(new StreamRecord<String>("11", initialTime));
-               expectedOutput.add(new StreamRecord<String>("111", 
initialTime));
+               testHarness.processElement(new StreamRecord<>(11, initialTime), 
1, 1);
+               testHarness.processElement(new StreamRecord<>(111, 
initialTime), 1, 1);
+               expectedOutput.add(new StreamRecord<>("11", initialTime));
+               expectedOutput.add(new StreamRecord<>("111", initialTime));
 
                testHarness.waitForInputProcessing();
 
@@ -298,7 +305,7 @@ public void testCheckpointBarriers() throws Exception {
 
                // now we should see the barrier and after that the buffered 
elements
                expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-               expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Hello-0-0", 
initialTime));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.",
                                expectedOutput,
@@ -326,11 +333,11 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
+               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<>(new IdentityMap());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
                long initialTime = 0L;
 
                testHarness.invoke();
@@ -340,16 +347,16 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
 
                // These elements should be buffered until we receive barriers 
from
                // all inputs
-               testHarness.processElement(new 
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-               testHarness.processElement(new StreamRecord<String>("Ciao-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Hello-0-0", 
initialTime), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Ciao-0-0", 
initialTime), 0, 0);
 
                // These elements should be forwarded, since we did not yet 
receive a checkpoint barrier
                // on that input, only add to same input, otherwise we would 
not know the ordering
                // of the output since the Task might read the inputs in any 
order
-               testHarness.processElement(new StreamRecord<Integer>(42, 
initialTime), 1, 1);
-               testHarness.processElement(new StreamRecord<Integer>(1337, 
initialTime), 1, 1);
-               expectedOutput.add(new StreamRecord<String>("42", initialTime));
-               expectedOutput.add(new StreamRecord<String>("1337", 
initialTime));
+               testHarness.processElement(new StreamRecord<>(42, initialTime), 
1, 1);
+               testHarness.processElement(new StreamRecord<>(1337, 
initialTime), 1, 1);
+               expectedOutput.add(new StreamRecord<>("42", initialTime));
+               expectedOutput.add(new StreamRecord<>("1337", initialTime));
 
                testHarness.waitForInputProcessing();
                // we should not yet see the barrier, only the two elements 
from non-blocked input
@@ -365,8 +372,8 @@ public void testOvertakingCheckpointBarriers() throws 
Exception {
                testHarness.processEvent(new CheckpointBarrier(1, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
 
                expectedOutput.add(new CancelCheckpointMarker(0));
-               expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
-               expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Hello-0-0", 
initialTime));
+               expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
                expectedOutput.add(new CheckpointBarrier(1, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                testHarness.waitForInputProcessing();
@@ -611,5 +618,118 @@ public String map2(Integer value) throws Exception {
                        return value.toString();
                }
        }
+
+       @Test
+       public void testMutableObjectReuse() throws Exception {
+               final TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                       TwoInputStreamTask::new,
+                       new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                       new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                       new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO));
+
+               testHarness.setupOperatorChain(new OperatorID(), new 
TestMutableObjectReuseHeadOperator())
+                       .chain(new OperatorID(), new 
TestMutableObjectReuseHeadOperator.TestMutableObjectReuseNextOperator(),
+                               new TupleSerializer(Tuple2.class,
+                                       new 
TypeSerializer<?>[]{BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()), BasicTypeInfo.INT_TYPE_INFO.createSerializer(new 
ExecutionConfig())}))
+                       .finish();
+
+               ExecutionConfig executionConfig = 
testHarness.getExecutionConfig();
+               executionConfig.enableObjectReuse();
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+
+               testHarness.processElement(new 
StreamRecord<>(Tuple2.of("Hello", 1)), 0, 0);
+               testHarness.processElement(new 
StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1), 1, 0);
+               testHarness.processElement(new Watermark(initialTime + 1), 0, 
0);
+               testHarness.processElement(new Watermark(initialTime + 1), 1, 
0);
+               testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 
1), initialTime + 2), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
+               testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
+               testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 
2), initialTime + 3), 1, 0);
+
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1)));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2), 
initialTime + 1));
+               expectedOutput.add(new Watermark(initialTime + 1));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1), 
initialTime + 2));
+               expectedOutput.add(new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2), 
initialTime + 3));
+
+               testHarness.endInput();
+
+               testHarness.waitForTaskCompletion();
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.",
+                       expectedOutput,
+                       testHarness.getOutput());
+       }
+
+       // This must only be used in one test, otherwise the static fields will 
be changed
+       // by several tests concurrently
+       private static class TestMutableObjectReuseHeadOperator
+               extends AbstractStreamOperator<Tuple2<String, Integer>>
+               implements TwoInputStreamOperator<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private static Object headOperatorValue;
+
+               private Object prevRecord1 = null;
+               private Object prevValue1 = null;
+
+               private Object prevRecord2 = null;
+               private Object prevValue2 = null;
+
+               @Override
+               public void processElement1(StreamRecord<Tuple2<String, 
Integer>> element) throws Exception {
+                       if (prevRecord1 != null) {
+                               assertNotEquals("Reuse StreamRecord object in 
the 1th input of the head operator.", prevRecord1, element);
+                               assertEquals("No reuse value object in the 1th 
input of the head operator.", prevValue1, element.getValue());
+                       }
+
+                       prevRecord1 = element;
+                       prevValue1 = element.getValue();
+
+                       headOperatorValue = element.getValue();
+
+                       output.collect(element);
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Tuple2<String, 
Integer>> element) {
+                       if (prevRecord2 != null) {
+                               assertNotEquals("Reuse StreamRecord object in 
the 2th input of the head operator.", prevRecord2, element);
+                               assertEquals("No reuse value object in the 2th 
input of the head operator.", prevValue2, element.getValue());
+
+                               if (prevValue1 != null) {
+                                       assertTrue("Reuse the same value object 
in two inputs of the head operator.", prevValue1 != prevValue2);
+                               }
+                       }
+
+                       prevRecord2 = element;
+                       prevValue2 = element.getValue();
+
+                       headOperatorValue = element.getValue();
+
+                       output.collect(element);
+               }
+
+               private static class TestMutableObjectReuseNextOperator
+                       extends AbstractStreamOperator<Tuple2<String, Integer>>
+                       implements OneInputStreamOperator<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void processElement(StreamRecord<Tuple2<String, 
Integer>> element) throws Exception {
+                               assertEquals("No reuse value object in chain.", 
headOperatorValue, element.getValue());
+
+                               output.collect(element);
+                       }
+               }
+       }
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to