Repository: flink
Updated Branches:
  refs/heads/master 2d191ab05 -> a2eb6cc87


http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
new file mode 100644
index 0000000..d3fde9e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.timestamp;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for timestamps, watermarks, and event-time sources.
+ */
+@SuppressWarnings("serial")
+public class TimestampITCase {
+
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int NUM_TASK_SLOTS = 3;
+       private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void startCluster() {
+               try {
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+                       
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+                       cluster = new ForkableFlinkMiniCluster(config, false);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to start test cluster: " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void shutdownCluster() {
+               try {
+                       cluster.shutdown();
+                       cluster = null;
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to stop test cluster: " + e.getMessage());
+               }
+       }
+
+       /**
+        * These check whether custom timestamp emission works at sources and 
also whether timestamps
+        * arrive at operators throughout a topology.
+        *
+        * <p>
+        * This only uses map to test the workings of watermarks in a complete, 
running topology. All
+        * tasks and stream operators have dedicated tests that test the 
watermark propagation
+        * behaviour.
+        */
+       @Test
+       public void testWatermarkPropagation() throws Exception {
+               final int NUM_WATERMARKS = 10;
+
+               long initialTime = 0L;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                               "localhost", cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+               env.getConfig().enableTimestamps();
+
+
+               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(initialTime, NUM_WATERMARKS));
+               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSource(initialTime, NUM_WATERMARKS));
+
+               source1
+                               .map(new IdentityMap())
+                               .connect(source2).map(new IdentityCoMap())
+                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator());
+
+               env.execute();
+
+               // verify that all the watermarks arrived at the final custom 
operator
+               for (int i = 0; i < PARALLELISM; i++) {
+                       for (int j = 0; j < NUM_WATERMARKS; j++) {
+                               if 
(!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + 
j))) {
+                                       Assert.fail("Wrong watermark.");
+                               }
+                       }
+               }
+       }
+
+
+
+       /**
+        * These check whether timestamps are properly assigned at the sources 
and handled in
+        * network transmission and between chained operators when timestamps 
are enabled.
+        */
+       @Test
+       public void testTimestampHandling() throws Exception {
+               final int NUM_ELEMENTS = 10;
+
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                               "localhost", cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+               env.getConfig().enableTimestamps();
+
+
+               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(0L, NUM_ELEMENTS));
+               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSource(0L, NUM_ELEMENTS));
+
+               source1
+                               .map(new IdentityMap())
+                               .connect(source2).map(new IdentityCoMap())
+                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+               env.execute();
+       }
+
+       /**
+        * These check whether timestamps are properly ignored when they are 
disabled.
+        */
+       @Test
+       public void testDisabledTimestamps() throws Exception {
+               final int NUM_ELEMENTS = 10;
+
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                               "localhost", cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+               Assert.assertEquals("Timestamps are not disabled by default.", 
false, env.getConfig().areTimestampsEnabled());
+               env.getConfig().disableTimestamps();
+
+
+               DataStream<Integer> source1 = env.addSource(new 
MyNonWatermarkingSource(NUM_ELEMENTS));
+               DataStream<Integer> source2 = env.addSource(new 
MyNonWatermarkingSource(NUM_ELEMENTS));
+
+               source1
+                               .map(new IdentityMap())
+                               .connect(source2).map(new IdentityCoMap())
+                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator());
+
+               env.execute();
+       }
+
+       /**
+        * This tests whether the program throws an exception when an 
event-time source tries
+        * to emit without timestamp.
+        */
+       @Test(expected = ProgramInvocationException.class)
+       public void testEventTimeSourceEmitWithoutTimestamp() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+
+               DataStream<Integer> source1 = env.addSource(new 
MyErroneousTimestampSource());
+
+               source1
+                               .map(new IdentityMap());
+
+               env.execute();
+       }
+
+       /**
+        * This tests whether the program throws an exception when a regular 
source tries
+        * to emit with timestamp.
+        */
+       @Test(expected = ProgramInvocationException.class)
+       public void testSourceEmitWithTimestamp() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+
+               DataStream<Integer> source1 = env.addSource(new 
MyErroneousSource());
+
+               source1
+                               .map(new IdentityMap());
+
+               env.execute();
+       }
+
+       /**
+        * This tests whether the program throws an exception when a regular 
source tries
+        * to emit a watermark.
+        */
+       @Test(expected = ProgramInvocationException.class)
+       public void testSourceEmitWatermark() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getJobManagerRPCPort());
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+
+               DataStream<Integer> source1 = env.addSource(new 
MyErroneousWatermarkSource());
+
+               source1
+                               .map(new IdentityMap());
+
+               env.execute();
+       }
+
+       public static class CustomOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
+
+               List<Watermark> watermarks;
+               public static List<Watermark>[] finalWatermarks = new 
List[PARALLELISM];
+               private long oldTimestamp;
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) 
throws Exception {
+                       if (element.getTimestamp() != element.getValue()) {
+                               Assert.fail("Timestamps are not properly 
handled.");
+                       }
+                       oldTimestamp = element.getTimestamp();
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       watermarks.add(mark);
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       watermarks = new ArrayList<Watermark>();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       
finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
+               }
+       }
+
+       public static class TimestampCheckingOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) 
throws Exception {
+                       if (element.getTimestamp() != element.getValue()) {
+                               Assert.fail("Timestamps are not properly 
handled.");
+                       }
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+               }
+       }
+
+       public static class DisabledTimestampCheckingOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) 
throws Exception {
+                       if (element.getTimestamp() != 0) {
+                               Assert.fail("Timestamps are not properly 
handled.");
+                       }
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+               }
+       }
+
+       public static class IdentityCoMap implements CoMapFunction<Integer, 
Integer, Integer> {
+               @Override
+               public Integer map1(Integer value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public Integer map2(Integer value) throws Exception {
+                       return value;
+               }
+       }
+
+       public static class IdentityMap implements MapFunction<Integer, 
Integer> {
+               @Override
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+       }
+
+       public static class MyTimestampSource implements 
EventTimeSourceFunction<Integer> {
+
+               long initialTime;
+               int numWatermarks;
+
+               public MyTimestampSource(long initialTime, int numWatermarks) {
+                       this.initialTime = initialTime;
+                       this.numWatermarks = numWatermarks;
+               }
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < numWatermarks; i++) {
+                               ctx.collectWithTimestamp(i, initialTime + i);
+                               ctx.emitWatermark(new Watermark(initialTime + 
i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+
+       public static class MyNonWatermarkingSource implements 
SourceFunction<Integer> {
+
+               int numWatermarks;
+
+               public MyNonWatermarkingSource(int numWatermarks) {
+                       this.numWatermarks = numWatermarks;
+               }
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < numWatermarks; i++) {
+                               ctx.collect(i);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+
+       // This is a event-time source. This should only emit elements with 
timestamps. The test should
+       // therefore throw an exception
+       public static class MyErroneousTimestampSource implements 
EventTimeSourceFunction<Integer> {
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < 10; i++) {
+                               ctx.collect(i);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+
+       // This is a normal source. This should only emit elements without 
timestamps. The test should
+       // therefore throw an exception
+       public static class MyErroneousSource implements 
SourceFunction<Integer> {
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < 10; i++) {
+                               ctx.collectWithTimestamp(i, 0L);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+
+       // This is a normal source. This should only emit elements without 
timestamps. This also
+       // must not emit watermarks. The test should therefore throw an 
exception
+       public static class MyErroneousWatermarkSource implements 
SourceFunction<Integer> {
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < 10; i++) {
+                               ctx.collect(i);
+                               ctx.emitWatermark(new Watermark(0L));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
deleted file mode 100644
index 0467b5f..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.CoReaderIterator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-public class MockCoContext<IN1, IN2, OUT> {
-       // private Collection<IN1> input1;
-       // private Collection<IN2> input2;
-       private Iterator<IN1> inputIterator1;
-       private Iterator<IN2> inputIterator2;
-       private List<OUT> outputs;
-
-       private Output<OUT> collector;
-       private StreamRecordSerializer<IN1> inDeserializer1;
-       private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
mockIterator;
-       private StreamRecordSerializer<IN2> inDeserializer2;
-
-       public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
-
-               if (input1.isEmpty() || input2.isEmpty()) {
-                       throw new RuntimeException("Inputs must not be empty");
-               }
-
-               this.inputIterator1 = input1.iterator();
-               this.inputIterator2 = input2.iterator();
-
-               TypeInformation<IN1> inTypeInfo1 = 
TypeExtractor.getForObject(input1.iterator().next());
-               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1, 
new ExecutionConfig());
-               TypeInformation<IN2> inTypeInfo2 = 
TypeExtractor.getForObject(input2.iterator().next());
-               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2, 
new ExecutionConfig());
-
-               mockIterator = new MockCoReaderIterator(inDeserializer1, 
inDeserializer2);
-
-               outputs = new ArrayList<OUT>();
-               collector = new MockOutput<OUT>(outputs);
-       }
-
-       private int currentInput = 1;
-       private StreamRecord<IN1> reuse1;
-       private StreamRecord<IN2> reuse2;
-
-       private class MockCoReaderIterator extends
-                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
-
-               public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> 
serializer1,
-                               TypeSerializer<StreamRecord<IN2>> serializer2) {
-                       super(null, serializer1, serializer2);
-                       reuse1 = inDeserializer1.createInstance();
-                       reuse2 = inDeserializer2.createInstance();
-               }
-
-               @Override
-               public int next(StreamRecord<IN1> target1, StreamRecord<IN2> 
target2) throws IOException {
-                       this.delegate1.setInstance(target1);
-                       this.delegate2.setInstance(target2);
-
-                       int inputNumber = nextRecord();
-                       target1.setObject(reuse1.getObject());
-                       target2.setObject(reuse2.getObject());
-
-                       return inputNumber;
-               }
-       }
-
-       private Integer nextRecord() {
-               if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
-                       switch (currentInput) {
-                       case 1:
-                               return next1();
-                       case 2:
-                               return next2();
-                       default:
-                               return 0;
-                       }
-               }
-
-               if (inputIterator1.hasNext()) {
-                       return next1();
-               }
-
-               if (inputIterator2.hasNext()) {
-                       return next2();
-               }
-
-               return 0;
-       }
-
-       private int next1() {
-               reuse1 = inDeserializer1.createInstance();
-               reuse1.setObject(inputIterator1.next());
-               currentInput = 2;
-               return 1;
-       }
-
-       private int next2() {
-               reuse2 = inDeserializer2.createInstance();
-               reuse2.setObject(inputIterator2.next());
-               currentInput = 1;
-               return 2;
-       }
-
-       public List<OUT> getOutputs() {
-               return outputs;
-       }
-
-       public Output<OUT> getCollector() {
-               return collector;
-       }
-
-       public StreamRecordSerializer<IN1> getInDeserializer1() {
-               return inDeserializer1;
-       }
-
-       public StreamRecordSerializer<IN2> getInDeserializer2() {
-               return inDeserializer2;
-       }
-
-       public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
getIterator() {
-               return mockIterator;
-       }
-
-       public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(TwoInputStreamOperator<IN1, IN2, OUT> operator,
-                       List<IN1> input1, List<IN2> input2) {
-               MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
-               StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext("CoMockTask",
-                               new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
-                               new ExecutionConfig(), null, null, new 
HashMap<String, Accumulator<?, ?>>());
-
-               operator.setup(mockContext.collector, runtimeContext);
-
-               try {
-                       operator.open(null);
-
-                       StreamRecordSerializer<IN1> inputDeserializer1 = 
mockContext.getInDeserializer1();
-                       StreamRecordSerializer<IN2> inputDeserializer2 = 
mockContext.getInDeserializer2();
-                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
coIter = mockContext.mockIterator;
-
-                       boolean isRunning = true;
-
-                       int next;
-                       StreamRecord<IN1> reuse1 = 
inputDeserializer1.createInstance();
-                       StreamRecord<IN2> reuse2 = 
inputDeserializer2.createInstance();
-
-                       while (isRunning) {
-                               try {
-                                       next = coIter.next(reuse1, reuse2);
-                               } catch (IOException e) {
-                                       if (isRunning) {
-                                               throw new 
RuntimeException("Could not read next record.", e);
-                                       } else {
-                                               // Task already cancelled do 
nothing
-                                               next = 0;
-                                       }
-                               } catch (IllegalStateException e) {
-                                       if (isRunning) {
-                                               throw new 
RuntimeException("Could not read next record.", e);
-                                       } else {
-                                               // Task already cancelled do 
nothing
-                                               next = 0;
-                                       }
-                               }
-
-                               if (next == 0) {
-                                       break;
-                               } else if (next == 1) {
-                                       
operator.processElement1(reuse1.getObject());
-                                       reuse1 = 
inputDeserializer1.createInstance();
-                               } else {
-                                       
operator.processElement2(reuse2.getObject());
-                                       reuse2 = 
inputDeserializer2.createInstance();
-                               }
-                       }
-
-                       operator.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke operator.", 
e);
-               }
-
-               return mockContext.getOutputs();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 0d09c14..45ae88f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -17,34 +17,25 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
 
 public class MockContext<IN, OUT> {
        private Collection<IN> inputs;
        private List<OUT> outputs;
 
        private MockOutput<OUT> output;
-       private StreamRecordSerializer<IN> inDeserializer;
-       private IndexedReaderIterator<StreamRecord<IN>> iterator;
 
        public MockContext(Collection<IN> inputs) {
                this.inputs = inputs;
@@ -52,58 +43,19 @@ public class MockContext<IN, OUT> {
                        throw new RuntimeException("Inputs must not be empty");
                }
 
-               TypeInformation<IN> inTypeInfo = 
TypeExtractor.getForObject(inputs.iterator().next());
-               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo, new 
ExecutionConfig());
-
-               iterator = new IndexedInputIterator();
                outputs = new ArrayList<OUT>();
                output = new MockOutput<OUT>(outputs);
        }
 
-       private class IndexedInputIterator extends 
IndexedReaderIterator<StreamRecord<IN>> {
-               Iterator<IN> listIterator;
-
-               public IndexedInputIterator() {
-                       super(null, null);
-                       listIterator = inputs.iterator();
-               }
-
-               @Override
-               public StreamRecord<IN> next(StreamRecord<IN> reuse) throws 
IOException {
-                       if (listIterator.hasNext()) {
-                               reuse.setObject(listIterator.next());
-                       } else {
-                               reuse = null;
-                       }
-                       return reuse;
-               }
-
-               @Override
-               public StreamRecord<IN> next() throws IOException {
-                       if (listIterator.hasNext()) {
-                               StreamRecord<IN> result = 
inDeserializer.createInstance();
-                               result.setObject(listIterator.next());
-                               return result;
-                       } else {
-                               return null;
-                       }
-               }
-       }
-
        public List<OUT> getOutputs() {
                return outputs;
        }
 
-       public Collector<OUT> getOutput() {
+       public Output<StreamRecord<OUT>> getOutput() {
                return output;
        }
 
-       public MutableObjectIterator<StreamRecord<IN>> getIterator() {
-               return iterator;
-       }
-
-       public static <IN, OUT> List<OUT> 
createAndExecute(OneInputStreamOperator<IN, OUT> operator,
-                       List<IN> inputs) {
+       public static <IN, OUT> List<OUT> 
createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
                MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
                StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext("MockTask",
                                new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
@@ -114,8 +66,8 @@ public class MockContext<IN, OUT> {
                        operator.open(null);
 
                        StreamRecord<IN> nextRecord;
-                       while ((nextRecord = mockContext.getIterator().next()) 
!= null) {
-                               operator.processElement(nextRecord.getObject());
+                       for (IN in: inputs) {
+                               operator.processElement(new 
StreamRecord<IN>(in));
                        }
 
                        operator.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 6799d87..5371ba0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -22,9 +22,10 @@ import java.util.Collection;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-public class MockOutput<T> implements Output<T> {
+public class MockOutput<T> implements Output<StreamRecord<T>> {
        private Collection<T> outputs;
 
        public MockOutput(Collection<T> outputs) {
@@ -32,13 +33,18 @@ public class MockOutput<T> implements Output<T> {
        }
 
        @Override
-       public void collect(T record) {
+       public void collect(StreamRecord<T> record) {
                T copied = SerializationUtils.deserialize(SerializationUtils
-                               .serialize((Serializable) record));
+                               .serialize((Serializable) record.getValue()));
                outputs.add(copied);
        }
 
        @Override
+       public void emitWatermark(Watermark mark) {
+               throw new RuntimeException("THIS MUST BE IMPLEMENTED");
+       }
+
+       @Override
        public void close() {
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
deleted file mode 100644
index 1731e7c..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamtask.MockRecordWriter;
-import org.mockito.Mockito;
-
-public class MockRecordWriterFactory {
-
-       @SuppressWarnings("unchecked")
-       public static MockRecordWriter create() {
-               MockRecordWriter recWriter = mock(MockRecordWriter.class);
-               
-               Mockito.when(recWriter.initList()).thenCallRealMethod();
-               
doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class));
-               
-               recWriter.initList();
-               
-               return recWriter;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..133f143
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A test harness for testing a {@link OneInputStreamOperator}.
+ *
+ * <p>
+ * This mock task provides the operator with a basic runtime context and 
allows pushing elements
+ * and watermarks into the operator. {@link java.util.Deque}s containing the 
emitted elements
+ * and watermarks can be retrieved. You are free to modify these.
+ */
+public class OneInputStreamOperatorTestHarness<IN, OUT> {
+
+       OneInputStreamOperator<IN, OUT> operator;
+
+       ConcurrentLinkedQueue outputList;
+
+       ExecutionConfig executionConfig;
+
+       public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) {
+               this.operator = operator;
+
+               outputList = new ConcurrentLinkedQueue();
+
+               executionConfig = new ExecutionConfig();
+
+               StreamingRuntimeContext runtimeContext =  new 
StreamingRuntimeContext(
+                               "MockTwoInputTask",
+                               new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024),
+                               getClass().getClassLoader(),
+                               executionConfig,
+                               null,
+                               new 
LocalStateHandle.LocalStateHandleProvider<Serializable>(),
+                               new HashMap<String, Accumulator<?, ?>>());
+
+               operator.setup(new MockOutput(), runtimeContext);
+       }
+
+       /**
+        * Get all the output from the task. This contains StreamRecords and 
Events interleaved. Use
+        * {@link 
org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
+        * to extract only the StreamRecords.
+        */
+       public Queue getOutput() {
+               return outputList;
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
+        * with an empty {@link org.apache.flink.configuration.Configuration}.
+        */
+       public void open() throws Exception {
+               operator.open(new Configuration());
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
+        * with the given {@link org.apache.flink.configuration.Configuration}.
+        */
+       public void open(Configuration config) throws Exception {
+               operator.open(config);
+       }
+
+       /**
+        * Calls close on the operator.
+        */
+       public void close() throws Exception {
+               operator.close();
+       }
+
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               operator.processElement(element);
+       }
+
+       public void processElements(Collection<StreamRecord<IN>> elements) 
throws Exception {
+               for (StreamRecord<IN> element: elements) {
+                       operator.processElement(element);
+               }
+       }
+
+       public void processWatermark(Watermark mark) throws Exception {
+               operator.processWatermark(mark);
+       }
+
+       private class MockOutput implements Output<StreamRecord<OUT>> {
+
+               private TypeSerializer<OUT> outputSerializer;
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public void emitWatermark(Watermark mark) {
+                       outputList.add(mark);
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public void collect(StreamRecord<OUT> element) {
+                       if (outputSerializer == null) {
+                               outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
+                       }
+                       outputList.add(new 
StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
+                                       element.getTimestamp()));
+               }
+
+               @Override
+               public void close() {
+                       // ignore
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 764fe5f..2d7f6b5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -29,10 +29,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import 
org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 
 public class SourceFunctionUtil<T> {
 
@@ -40,25 +43,20 @@ public class SourceFunctionUtil<T> {
                List<T> outputs = new ArrayList<T>();
                if (sourceFunction instanceof RichFunction) {
                        RuntimeContext runtimeContext =  new 
StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
-                                       new ExecutionConfig(), null, new 
LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, 
?>>());
+                                       new ExecutionConfig(), null, new 
LocalStateHandle.LocalStateHandleProvider<Serializable>(), new HashMap<String, 
Accumulator<?, ?>>());
                        ((RichFunction) 
sourceFunction).setRuntimeContext(runtimeContext);
 
                        ((RichFunction) sourceFunction).open(new 
Configuration());
                }
                try {
-                       final Collector<T> collector = new 
MockOutput<T>(outputs);
-                       final Object lockObject = new Object();
-                       SourceFunction.SourceContext<T> ctx = new 
SourceFunction.SourceContext<T>() {
-                               @Override
-                               public void collect(T element) {
-                                       collector.collect(element);
-                               }
-
-                               @Override
-                               public Object getCheckpointLock() {
-                                       return lockObject;
-                               }
-                       };
+                       final Output<StreamRecord<T>> collector = new 
MockOutput<T>(outputs);
+                       final Object lockingObject = new Object();
+                       SourceFunction.SourceContext<T> ctx;
+                       if (sourceFunction instanceof EventTimeSourceFunction) {
+                               ctx = new 
StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
+                       } else {
+                               ctx = new 
StreamSource.NonWatermarkContext<T>(lockingObject, collector);
+                       }
                        sourceFunction.run(ctx);
                } catch (Exception e) {
                        throw new RuntimeException("Cannot invoke source.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
new file mode 100644
index 0000000..a0a6c8d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Utils for working with the various test harnesses.
+ */
+public class TestHarnessUtil {
+       /**
+        * Extracts the StreamRecords from the given output list.
+        */
+       @SuppressWarnings("unchecked")
+       public static <OUT> List<StreamRecord<OUT>> 
getStreamRecordsFromOutput(List output) {
+               List<StreamRecord<OUT>> resultElements = new 
LinkedList<StreamRecord<OUT>>();
+               for (Object e: output) {
+                       if (e instanceof StreamRecord) {
+                               resultElements.add((StreamRecord<OUT>) e);
+                       }
+               }
+               return resultElements;
+       }
+
+       /**
+        * Extracts the raw elements from the given output list.
+        */
+       @SuppressWarnings("unchecked")
+       public static <OUT> List<OUT> getRawElementsFromOutput(Queue output) {
+               List<OUT> resultElements = new LinkedList<OUT>();
+               for (Object e: output) {
+                       if (e instanceof StreamRecord) {
+                               resultElements.add((OUT) ((StreamRecord) 
e).getValue());
+                       }
+               }
+               return resultElements;
+       }
+
+       /**
+        * Compare the two queues containing operator/task output by converting 
them to an array first.
+        */
+       public static void assertOutputEquals(String message, Queue expected, 
Queue actual) {
+               Assert.assertArrayEquals(message,
+                               expected.toArray(),
+                               actual.toArray());
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..ea753f8
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A test harness for testing a {@link TwoInputStreamOperator}.
+ *
+ * <p>
+ * This mock task provides the operator with a basic runtime context and 
allows pushing elements
+ * and watermarks into the operator. {@link java.util.Deque}s containing the 
emitted elements
+ * and watermarks can be retrieved. you are free to modify these.
+ */
+public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
+
+       TwoInputStreamOperator<IN1, IN2, OUT> operator;
+
+       ConcurrentLinkedQueue outputList;
+
+       ExecutionConfig executionConfig;
+
+       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator) {
+               this.operator = operator;
+
+               outputList = new ConcurrentLinkedQueue();
+
+               executionConfig = new ExecutionConfig();
+
+               StreamingRuntimeContext runtimeContext =  new 
StreamingRuntimeContext(
+                               "MockTwoInputTask",
+                               new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024),
+                               getClass().getClassLoader(),
+                               new ExecutionConfig(),
+                               null,
+                               new 
LocalStateHandle.LocalStateHandleProvider<Serializable>(),
+                               new HashMap<String, Accumulator<?, ?>>());
+
+               operator.setup(new MockOutput(), runtimeContext);
+       }
+
+       /**
+        * Get all the output from the task. This contains StreamRecords and 
Events interleaved. Use
+        * {@link 
org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
+        * to extract only the StreamRecords.
+        */
+       public Queue getOutput() {
+               return outputList;
+       }
+
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
+        * with an empty {@link Configuration}.
+        */
+       public void open() throws Exception {
+               operator.open(new Configuration());
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
+        * with the given {@link Configuration}.
+        */
+       public void open(Configuration config) throws Exception {
+               operator.open(config);
+       }
+
+       /**
+        * Calls close on the operator.
+        */
+       public void close() throws Exception {
+               operator.close();
+       }
+
+       public void processElement1(StreamRecord<IN1> element) throws Exception 
{
+               operator.processElement1(element);
+       }
+
+       public void processElement2(StreamRecord<IN2> element) throws Exception 
{
+               operator.processElement2(element);
+       }
+
+       public void processWatermark1(Watermark mark) throws Exception {
+               operator.processWatermark1(mark);
+       }
+
+       public void processWatermark2(Watermark mark) throws Exception {
+               operator.processWatermark2(mark);
+       }
+
+       private class MockOutput implements Output<StreamRecord<OUT>> {
+
+               private TypeSerializer<OUT> outputSerializer;
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public void emitWatermark(Watermark mark) {
+                       outputList.add(mark);
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public void collect(StreamRecord<OUT> element) {
+                       if (outputSerializer == null) {
+                               outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
+                       }
+                       outputList.add(new 
StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
+                                       element.getTimestamp()));
+               }
+
+               @Override
+               public void close() {
+                       // ignore
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
new file mode 100644
index 0000000..6197092
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled. 
This differs from
+ * {@link org.apache.flink.test.checkpointing.StreamCheckpointingITCase} in 
that it contains
+ * a TwoInput (or co-) Task.
+ *
+ * <p>
+ * This checks whether checkpoint barriers correctly trigger TwoInputTasks and 
also whether
+ * this barriers are correctly forwarded.
+ *
+ * <p>
+ * This uses a mixture of Operators with the {@link Checkpointed} interface 
and the new
+ * {@link 
org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext#getOperatorState}
+ * method.
+ *
+ * <p>
+ * The test triggers a failure after a while and verifies that, after 
completion, the
+ * state reflects the "exactly once" semantics.
+ */
+@SuppressWarnings("serial")
+public class CoStreamCheckpointingITCase {
+
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int NUM_TASK_SLOTS = 3;
+       private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void startCluster() {
+               try {
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+                       
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+                       cluster = new ForkableFlinkMiniCluster(config, false);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to start test cluster: " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void shutdownCluster() {
+               try {
+                       cluster.shutdown();
+                       cluster = null;
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to stop test cluster: " + e.getMessage());
+               }
+       }
+
+
+
+       /**
+        * Runs the following program:
+        *
+        * <pre>
+        *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ 
(groupBy/reduce)->(sink) ]
+        * </pre>
+        */
+       @Test
+       public void runCheckpointedProgram() {
+
+               final long NUM_STRINGS = 10000000L;
+               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRPCPort());
+                       env.setParallelism(PARALLELISM);
+                       env.enableCheckpointing(500);
+                       env.getConfig().disableSysoutLogging();
+
+                       DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS));
+
+                       stream
+                                       // -------------- first vertex, chained 
to the source ----------------
+                                       .filter(new StringRichFilterFunction())
+
+                                       // -------------- second vertex - the 
stateful one that also fails ----------------
+                                       .connect(stream).flatMap(new 
LeftIdentityCoRichFlatMapFunction())
+
+                                       // -------------- third vertex - the 
stateful one that also fails ----------------
+                                       .map(new 
StringPrefixCountRichMapFunction())
+                                       .startNewChain()
+                                       .map(new StatefulCounterFunction())
+
+                                                       // -------------- 
fourth vertex - reducer and the sink ----------------
+                                       .groupBy("prefix")
+                                       .reduce(new 
OnceFailingReducer(NUM_STRINGS))
+                                       .addSink(new 
RichSinkFunction<PrefixCount>() {
+
+                                               private Map<Character, Long> 
counts = new HashMap<Character, Long>();
+
+                                               @Override
+                                               public void invoke(PrefixCount 
value) {
+                                                       Character first = 
value.prefix.charAt(0);
+                                                       Long previous = 
counts.get(first);
+                                                       if (previous == null) {
+                                                               
counts.put(first, value.count);
+                                                       } else {
+                                                               
counts.put(first, Math.max(previous, value.count));
+                                                       }
+                                               }
+
+//                                             @Override
+//                                             public void close() {
+//                                                     for (Long count : 
counts.values()) {
+//                                                             
assertEquals(NUM_STRINGS / 40, count.longValue());
+//                                                     }
+//                                             }
+                                       });
+
+                       env.execute();
+
+                       long filterSum = 0;
+                       for (long l : StringRichFilterFunction.counts) {
+                               filterSum += l;
+                       }
+
+                       long coMapSum = 0;
+                       for (long l : LeftIdentityCoRichFlatMapFunction.counts) 
{
+                               coMapSum += l;
+                       }
+
+                       long mapSum = 0;
+                       for (long l : StringPrefixCountRichMapFunction.counts) {
+                               mapSum += l;
+                       }
+
+                       long countSum = 0;
+                       for (long l : StatefulCounterFunction.counts) {
+                               countSum += l;
+                       }
+
+                       if 
(!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
+                               Assert.fail("Restore was never called on 
counting Map function.");
+                       }
+
+                       if 
(!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
+                               Assert.fail("Restore was never called on 
counting CoMap function.");
+                       }
+
+                       // verify that we counted exactly right
+
+                       assertEquals(NUM_STRINGS, filterSum);
+                       assertEquals(NUM_STRINGS, coMapSum);
+                       assertEquals(NUM_STRINGS, mapSum);
+                       assertEquals(NUM_STRINGS, countSum);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Functions
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class StringGeneratingSourceFunction extends 
RichSourceFunction<String>
+                       implements  ParallelSourceFunction<String> {
+
+               private final long numElements;
+
+               private Random rnd;
+               private StringBuilder stringBuilder;
+
+               private OperatorState<Integer> index;
+               private int step;
+
+               private volatile boolean isRunning;
+
+               static final long[] counts = new long[PARALLELISM];
+               @Override
+               public void close() throws IOException {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
index.value();
+               }
+
+
+               StringGeneratingSourceFunction(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws IOException {
+                       rnd = new Random();
+                       stringBuilder = new StringBuilder();
+                       step = 
getRuntimeContext().getNumberOfParallelSubtasks();
+
+
+                       index = getRuntimeContext().getOperatorState("index", 
getRuntimeContext().getIndexOfThisSubtask(), false);
+
+                       isRunning = true;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       final Object lockingObject = ctx.getCheckpointLock();
+
+                       while (isRunning && index.value() < numElements) {
+                               char first = (char) ((index.value() % 40) + 40);
+
+                               stringBuilder.setLength(0);
+                               stringBuilder.append(first);
+
+                               String result = randomString(stringBuilder, 
rnd);
+
+                               synchronized (lockingObject) {
+                                       index.update(index.value() + step);
+//                                     System.out.println("SOURCE EMIT: " + 
result);
+                                       ctx.collect(result);
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               private static String randomString(StringBuilder bld, Random 
rnd) {
+                       final int len = rnd.nextInt(10) + 5;
+
+                       for (int i = 0; i < len; i++) {
+                               char next = (char) (rnd.nextInt(20000) + 33);
+                               bld.append(next);
+                       }
+
+                       return bld.toString();
+               }
+       }
+
+       private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> {
+
+               private OperatorState<Long> count;
+               static final long[] counts = new long[PARALLELISM];
+
+               @Override
+               public PrefixCount map(PrefixCount value) throws Exception {
+                       count.update(count.value() + 1);
+                       return value;
+               }
+
+               @Override
+               public void open(Configuration conf) throws IOException {
+                       count = getRuntimeContext().getOperatorState("count", 
0L, false);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count.value();
+               }
+
+       }
+
+       private static class OnceFailingReducer extends 
RichReduceFunction<PrefixCount> {
+
+               private static volatile boolean hasFailed = false;
+
+               private final long numElements;
+
+               private long failurePos;
+               private long count;
+
+               OnceFailingReducer(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void open(Configuration parameters) {
+                       long failurePosMin = (long) (0.4 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+                       long failurePosMax = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
+                       count = 0;
+               }
+
+               @Override
+               public PrefixCount reduce(PrefixCount value1, PrefixCount 
value2) throws Exception {
+                       count++;
+                       if (!hasFailed && count >= failurePos) {
+                               hasFailed = true;
+                               throw new Exception("Test Failure");
+                       }
+
+                       value1.count += value2.count;
+                       return value1;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Type Classes
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class PrefixCount {
+
+               public String prefix;
+               public String value;
+               public long count;
+
+               public PrefixCount() {}
+
+               public PrefixCount(String prefix, String value, long count) {
+                       this.prefix = prefix;
+                       this.value = value;
+                       this.count = count;
+               }
+
+               @Override
+               public String toString() {
+                       return prefix + " / " + value;
+               }
+       }
+
+       private static class StringRichFilterFunction extends 
RichFilterFunction<String> implements Checkpointed<Long> {
+
+               Long count = 0L;
+               static final long[] counts = new long[PARALLELISM];
+
+               @Override
+               public boolean filter(String value) {
+                       count++;
+                       return value.length() < 100;
+               }
+
+               @Override
+               public void close() {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       count = state;
+               }
+       }
+
+       private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
+
+               private long count = 0;
+               static final long[] counts = new long[PARALLELISM];
+               static volatile boolean restoreCalledAtLeastOnce = false;
+
+               @Override
+               public PrefixCount map(String value) throws IOException {
+                       count += 1;
+                       return new PrefixCount(value.substring(0, 1), value, 
1L);
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       restoreCalledAtLeastOnce = true;
+                       count = state;
+                       if (count == 0) {
+                               throw new RuntimeException("Restore from 
beginning");
+                       }
+               }
+
+               @Override
+               public void close() throws IOException {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+       }
+
+       private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
+
+               long count = 0;
+               static final long[] counts = new long[PARALLELISM];
+
+               static volatile boolean restoreCalledAtLeastOnce = false;
+
+               @Override
+               public void flatMap1(String value, Collector<String> out) 
throws IOException {
+                       count += 1;
+//                     System.out.println("Co-Map COUNT: " + count);
+
+                       out.collect(value);
+               }
+
+               @Override
+               public void flatMap2(String value, Collector<String> out) 
throws IOException {
+                       // we ignore the values from the second input
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       restoreCalledAtLeastOnce = true;
+                       count = state;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+       }
+}

Reply via email to