http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
new file mode 100644
index 0000000..15d1fd5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state.checkpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.streaming.state.MapState;
+import org.apache.flink.streaming.state.OperatorState;
+
+public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
+
+       private static final long serialVersionUID = 1L;
+
+       protected Set<K> removedItems;
+       protected Map<K, V> updatedItems;
+       protected boolean clear;
+
+       @SuppressWarnings("unchecked")
+       public MapCheckpoint(OperatorState<Map<K, V>> operatorState) {
+               if (operatorState instanceof MapState) {
+                       MapState<K, V> mapState = (MapState<K, V>) 
operatorState;
+
+                       this.removedItems = mapState.getRemovedItems();
+                       this.clear = mapState.isCleared();
+
+                       this.updatedItems = new HashMap<K, V>();
+                       for (K key : mapState.getUpdatedItems()) {
+                               this.updatedItems.put(key, mapState.get(key));
+                       }
+                       this.checkpointedState = this.updatedItems;
+
+               } else {
+                       throw new RuntimeException("MapCheckpoint can only be 
used with MapState");
+               }
+
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public StateCheckpoint<Map<K, V>> update(StateCheckpoint<Map<K, V>> 
nextCheckpoint) {
+               MapCheckpoint<K, V> mapCheckpoint = (MapCheckpoint<K, V>) 
nextCheckpoint;
+               if (this.checkpointedState == null) {
+                       this.checkpointedState = mapCheckpoint.updatedItems;
+               } else {
+                       if (mapCheckpoint.clear) {
+                               this.checkpointedState.clear();
+                       }
+                       for (Object key : mapCheckpoint.removedItems) {
+                               this.checkpointedState.remove(key);
+                       }
+                       
this.checkpointedState.putAll(mapCheckpoint.updatedItems);
+               }
+               return this;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
new file mode 100644
index 0000000..8b76245
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state.checkpoint;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.state.OperatorState;
+
+/**
+ * Base class for creating checkpoints for {@link OperatorState}. This
+ * checkpoints will be used to backup states in stateful Flink operators and
+ * also to restore them in case of node failure. To allow incremental
+ * checkpoints override the {@link #update(StateCheckpoint)} method.
+ * 
+ * @param <T>
+ *            The type of the state.
+ */
+public class StateCheckpoint<T> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       T checkpointedState;
+
+       /**
+        * Creates a state checkpoint from the given {@link OperatorState}
+        * 
+        * @param operatorState
+        *            The {@link OperatorState} to checkpoint.
+        */
+       public StateCheckpoint(OperatorState<T> operatorState) {
+               this.checkpointedState = operatorState.getState();
+       }
+
+       public StateCheckpoint() {
+               this.checkpointedState = null;
+       }
+
+       /**
+        * Returns the state object for the checkpoint.
+        * 
+        * @return The checkpointed state object.
+        */
+       public T getCheckpointedState() {
+               return checkpointedState;
+       }
+
+       /**
+        * Updates the checkpoint from next one. Override this method to allow
+        * incremental updates.
+        * 
+        * @param nextCheckpoint
+        *            The {@link StateCheckpoint} will be used to update from.
+        */
+       public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
+               this.checkpointedState = nextCheckpoint.getCheckpointedState();
+               return this;
+       }
+
+       @Override
+       public String toString() {
+               return checkpointedState.toString();
+       }
+
+       public boolean stateEquals(StateCheckpoint<T> other) {
+               return checkpointedState.equals(other.checkpointedState);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
new file mode 100755
index 0000000..691b111
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import akka.actor.ActorRef;
+
+public class ClusterUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterUtil.class);
+       public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute 
empty job";
+
+       /**
+        * Executes the given JobGraph locally, on a FlinkMiniCluster
+        * 
+        * @param jobGraph
+        *            jobGraph
+        * @param degreeOfParallelism
+        *            numberOfTaskTrackers
+        * @param memorySize
+        *            memorySize
+        */
+       public static void runOnMiniCluster(JobGraph jobGraph, int 
degreeOfParallelism, long memorySize)
+                       throws Exception {
+
+               Configuration configuration = jobGraph.getJobConfiguration();
+
+               LocalFlinkMiniCluster exec = null;
+
+               
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
+               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
degreeOfParallelism);
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Running on mini cluster");
+               }
+
+               try {
+                       exec = new LocalFlinkMiniCluster(configuration, true);
+                       ActorRef jobClient = exec.getJobClient();
+
+                       JobClient.submitJobAndWait(jobGraph, true, jobClient, 
exec.timeout());
+
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       if (exec != null) {
+                               exec.stop();
+                       }
+               }
+       }
+
+       public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) 
throws Exception {
+               runOnMiniCluster(jobGraph, numOfSlots, -1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
new file mode 100644
index 0000000..73e6360
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Array;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+
+public class KeySelectorUtil {
+
+       public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, 
Tuple2.class, Tuple3.class,
+                       Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, 
Tuple8.class, Tuple9.class,
+                       Tuple10.class, Tuple11.class, Tuple12.class, 
Tuple13.class, Tuple14.class,
+                       Tuple15.class, Tuple16.class, Tuple17.class, 
Tuple18.class, Tuple19.class,
+                       Tuple20.class, Tuple21.class, Tuple22.class, 
Tuple23.class, Tuple24.class,
+                       Tuple25.class };
+
+       public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, 
TypeInformation<X> typeInfo) {
+               int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+               int keyLength = logicalKeyPositions.length;
+               boolean[] orders = new boolean[keyLength];
+               TypeComparator<X> comparator = ((CompositeType<X>) 
typeInfo).createComparator(
+                               logicalKeyPositions, orders, 0);
+               return new ComparableKeySelector<X>(comparator, keyLength);
+       }
+
+       public static class ComparableKeySelector<IN> implements 
KeySelector<IN, Tuple> {
+
+               private static final long serialVersionUID = 1L;
+
+               private TypeComparator<IN> comparator;
+               private int keyLength;
+               private Object[] keyArray;
+               private Tuple key;
+
+               public ComparableKeySelector(TypeComparator<IN> comparator, int 
keyLength) {
+                       this.comparator = comparator;
+                       this.keyLength = keyLength;
+                       keyArray = new Object[keyLength];
+                       try {
+                               key = (Tuple) tupleClasses[keyLength - 
1].newInstance();
+                       } catch (Exception e) {
+                       }
+               }
+
+               @Override
+               public Tuple getKey(IN value) throws Exception {
+                       comparator.extractKeys(value, keyArray, 0);
+                       for (int i = 0; i < keyLength; i++) {
+                               key.setField(keyArray[i], i);
+                       }
+                       return key;
+               }
+
+       }
+
+       public static class ArrayKeySelector<IN> implements KeySelector<IN, 
Tuple> {
+
+               private static final long serialVersionUID = 1L;
+
+               Tuple key;
+               int[] fields;
+
+               public ArrayKeySelector(int... fields) {
+                       this.fields = fields;
+                       try {
+                               key = (Tuple) tupleClasses[fields.length - 
1].newInstance();
+                       } catch (Exception e) {
+                       }
+               }
+
+               @Override
+               public Tuple getKey(IN value) throws Exception {
+                       for (int i = 0; i < fields.length; i++) {
+                               int pos = fields[i];
+                               key.setField(Array.get(value, fields[pos]), i);
+                       }
+                       return key;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
new file mode 100644
index 0000000..49cd497
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import 
org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
+import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.junit.Test;
+
+public class AggregationFunctionTest {
+
+       @Test
+       public void groupSumIntegerTest() {
+
+               List<Tuple2<Integer, Integer>> expectedSumList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               List<Tuple2<Integer, Integer>> expectedMinList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               List<Tuple2<Integer, Integer>> expectedMaxList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               List<Integer> expectedSumList0 = new ArrayList<Integer>();
+               List<Integer> expectedMinList0 = new ArrayList<Integer>();
+               List<Integer> expectedMaxList0 = new ArrayList<Integer>();
+               List<Tuple2<Integer, Integer>> expectedGroupSumList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               List<Tuple2<Integer, Integer>> expectedGroupMinList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               List<Tuple2<Integer, Integer>> expectedGroupMaxList = new 
ArrayList<Tuple2<Integer, Integer>>();
+
+               List<Integer> simpleInput = new ArrayList<Integer>();
+
+               int groupedSum0 = 0;
+               int groupedSum1 = 0;
+               int groupedSum2 = 0;
+
+               for (int i = 0; i < 9; i++) {
+                       simpleInput.add(i);
+                       expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, 
(i + 1) * i / 2));
+                       expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 
0));
+                       expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, 
i));
+
+                       expectedSumList0.add((i + 1) * i / 2);
+                       expectedMaxList0.add(i);
+                       expectedMinList0.add(0);
+
+                       int groupedSum;
+                       switch (i % 3) {
+                       case 0:
+                               groupedSum = groupedSum0 += i;
+                               break;
+                       case 1:
+                               groupedSum = groupedSum1 += i;
+                               break;
+                       default:
+                               groupedSum = groupedSum2 += i;
+                               break;
+                       }
+
+                       expectedGroupSumList.add(new Tuple2<Integer, Integer>(i 
% 3, groupedSum));
+                       expectedGroupMinList.add(new Tuple2<Integer, Integer>(i 
% 3, i % 3));
+                       expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i 
% 3, i));
+               }
+
+               TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
+                               .getForObject(new Tuple2<Integer, Integer>(0, 
0));
+               TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+
+               ReduceFunction<Tuple2<Integer, Integer>> sumFunction = 
SumAggregator.getSumFunction(1,
+                               Integer.class, type1);
+               ReduceFunction<Integer> sumFunction0 = SumAggregator
+                               .getSumFunction(0, Integer.class, type2);
+               ReduceFunction<Tuple2<Integer, Integer>> minFunction = 
ComparableAggregator.getAggregator(
+                               1, type1, AggregationType.MIN);
+               ReduceFunction<Integer> minFunction0 = 
ComparableAggregator.getAggregator(0, type2,
+                               AggregationType.MIN);
+               ReduceFunction<Tuple2<Integer, Integer>> maxFunction = 
ComparableAggregator.getAggregator(
+                               1, type1, AggregationType.MAX);
+               ReduceFunction<Integer> maxFunction0 = 
ComparableAggregator.getAggregator(0, type2,
+                               AggregationType.MAX);
+               List<Tuple2<Integer, Integer>> sumList = 
MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(sumFunction), getInputList());
+
+               List<Tuple2<Integer, Integer>> minList = 
MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minFunction), getInputList());
+
+               List<Tuple2<Integer, Integer>> maxList = 
MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxFunction), getInputList());
+
+               TypeInformation<Tuple2<Integer, Integer>> typeInfo = 
TypeExtractor
+                               .getForObject(new Tuple2<Integer, Integer>(1, 
1));
+
+               KeySelector<Tuple2<Integer, Integer>, ?> keySelector = 
KeySelectorUtil.getSelectorForKeys(
+                               new Keys.ExpressionKeys<Tuple2<Integer, 
Integer>>(new int[] { 0 }, typeInfo),
+                               typeInfo);
+
+               List<Tuple2<Integer, Integer>> groupedSumList = 
MockContext.createAndExecute(
+                               new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(sumFunction, keySelector),
+                               getInputList());
+
+               List<Tuple2<Integer, Integer>> groupedMinList = 
MockContext.createAndExecute(
+                               new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(minFunction, keySelector),
+                               getInputList());
+
+               List<Tuple2<Integer, Integer>> groupedMaxList = 
MockContext.createAndExecute(
+                               new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(maxFunction, keySelector),
+                               getInputList());
+
+               assertEquals(expectedSumList, sumList);
+               assertEquals(expectedMinList, minList);
+               assertEquals(expectedMaxList, maxList);
+               assertEquals(expectedGroupSumList, groupedSumList);
+               assertEquals(expectedGroupMinList, groupedMinList);
+               assertEquals(expectedGroupMaxList, groupedMaxList);
+               assertEquals(expectedSumList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+               assertEquals(expectedMinList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+               assertEquals(expectedMaxList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               try {
+                       env.generateSequence(1, 100).min(1);
+                       fail();
+               } catch (Exception e) {
+                       // Nothing to do here
+               }
+               try {
+                       env.generateSequence(1, 100).min(2);
+                       fail();
+               } catch (Exception e) {
+                       // Nothing to do here
+               }
+               try {
+                       env.generateSequence(1, 100).min(3);
+                       fail();
+               } catch (Exception e) {
+                       // Nothing to do here
+               }
+
+               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MAXBY, 
true);
+               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MAXBY, 
false);
+
+               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MINBY, 
true);
+               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MINBY, 
false);
+
+               List<Tuple2<Integer, Integer>> maxByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+
+               List<Tuple2<Integer, Integer>> maxByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+
+               List<Tuple2<Integer, Integer>> minByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+
+               List<Tuple2<Integer, Integer>> minByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+
+               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionFirst),
+                               getInputList()));
+               assertEquals(maxByLastExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionLast),
+                               getInputList()));
+               assertEquals(minByLastExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionLast),
+                               getInputList()));
+               assertEquals(minByFirstExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionFirst),
+                               getInputList()));
+
+       }
+
+       @Test
+       public void minMaxByTest() {
+               TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
+                               .getForObject(new Tuple2<Integer, Integer>(0, 
0));
+
+               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MAXBY, 
true);
+               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MAXBY, 
false);
+
+               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MINBY, 
true);
+               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = 
ComparableAggregator
+                               .getAggregator(0, type1, AggregationType.MINBY, 
false);
+
+               List<Tuple2<Integer, Integer>> maxByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+
+               List<Tuple2<Integer, Integer>> maxByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
+               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+
+               List<Tuple2<Integer, Integer>> minByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+
+               List<Tuple2<Integer, Integer>> minByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+
+               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionFirst),
+                               getInputList()));
+               assertEquals(maxByLastExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionLast),
+                               getInputList()));
+               assertEquals(minByLastExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionLast),
+                               getInputList()));
+               assertEquals(minByFirstExpected, MockContext.createAndExecute(
+                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionFirst),
+                               getInputList()));
+       }
+
+       private List<Tuple2<Integer, Integer>> getInputList() {
+               ArrayList<Tuple2<Integer, Integer>> inputList = new 
ArrayList<Tuple2<Integer, Integer>>();
+               for (int i = 0; i < 9; i++) {
+                       inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+               }
+               return inputList;
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
new file mode 100644
index 0000000..6ad827a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class IterateTest {
+
+       private static final long MEMORYSIZE = 32;
+       private static boolean iterated = false;
+
+       public static final class IterationHead extends 
RichFlatMapFunction<Boolean, Boolean> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Boolean value, Collector<Boolean> out) 
throws Exception {
+                       if (value) {
+                               iterated = true;
+                       } else {
+                               out.collect(value);
+                       }
+
+               }
+
+       }
+
+       public static final class IterationTail extends 
RichFlatMapFunction<Boolean, Boolean> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Boolean value, Collector<Boolean> out) 
throws Exception {
+                       out.collect(true);
+
+               }
+
+       }
+
+       public static final class MySink implements SinkFunction<Boolean> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(Boolean tuple) {
+               }
+
+       }
+
+       @Test
+       public void test() throws Exception {
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+
+               env.setBufferTimeout(10);
+
+               DataStream<Boolean> source = env.fromElements(false, false, 
false);
+
+               IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+               DataStream<Boolean> increment = iteration.flatMap(new 
IterationHead()).flatMap(
+                               new IterationTail());
+
+               iteration.closeWith(increment).addSink(new MySink());
+
+               env.execute();
+
+               assertTrue(iterated);
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
new file mode 100644
index 0000000..2486715
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class OutputSplitterTest {
+
+       private static final long MEMORYSIZE = 32;
+
+       private static ArrayList<Integer> splitterResult1 = new 
ArrayList<Integer>();
+       private static ArrayList<Integer> splitterResult2 = new 
ArrayList<Integer>();
+
+
+       private static ArrayList<Integer> expectedSplitterResult = new 
ArrayList<Integer>();
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testOnMergedDataStream() throws Exception {
+               splitterResult1.clear();
+               splitterResult2.clear();
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.setBufferTimeout(1);
+
+               DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
+               DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+
+               d1 = d1.merge(d2);
+
+               d1.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
8354166915727490130L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value > 4) {
+                                       s.add(">");
+                               } else {
+                                       s.add("<");
+                               }
+                               return s;
+                       }
+               }).select(">").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
5827187510526388104L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult1.add(value);
+                       }
+               });
+
+               d1.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
-6822487543355994807L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 3 == 0) {
+                                       s.add("yes");
+                               } else {
+                                       s.add("no");
+                               }
+                               return s;
+                       }
+               }).select("yes").addSink(new SinkFunction<Integer>() {
+                       private static final long serialVersionUID = 
-2674335071267854599L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult2.add(value);
+                       }
+               });
+               env.execute();
+
+               Collections.sort(splitterResult1);
+               Collections.sort(splitterResult2);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
+               assertEquals(expectedSplitterResult, splitterResult1);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
+               assertEquals(expectedSplitterResult, splitterResult2);
+       }
+
+       @Test
+       public void testOnSingleDataStream() throws Exception {
+               splitterResult1.clear();
+               splitterResult2.clear();
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.setBufferTimeout(1);
+
+               DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+
+               ds.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
2524335410904414121L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 2 == 0) {
+                                       s.add("even");
+                               } else {
+                                       s.add("odd");
+                               }
+                               return s;
+                       }
+               }).select("even").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
-2995092337537209535L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult1.add(value);
+                       }
+               });
+
+               ds.split(new OutputSelector<Integer>() {
+
+                       private static final long serialVersionUID = 
-511693919586034092L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 4 == 0) {
+                                       s.add("yes");
+                               } else {
+                                       s.add("no");
+                               }
+                               return s;
+                       }
+               }).select("yes").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
-1749077049727705424L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult2.add(value);
+                       }
+               });
+               env.execute();
+
+               Collections.sort(splitterResult1);
+               Collections.sort(splitterResult2);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
+               assertEquals(expectedSplitterResult, splitterResult1);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,4,8));
+               assertEquals(expectedSplitterResult, splitterResult2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
new file mode 100755
index 0000000..757f6f6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+public class PrintTest implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+       private static final long MEMORYSIZE = 32;
+
+       private static final class IdentityMap implements MapFunction<Long, 
Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Long map(Long value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static final class FilterAll implements FilterFunction<Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Long value) throws Exception {
+                       return true;
+               }
+       }
+
+       @Test
+       public void test() throws Exception {
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.generateSequence(1, 10).map(new IdentityMap()).filter(new 
FilterAll()).print();
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
new file mode 100644
index 0000000..9be7de6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.source.FromElementsFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.util.MockCollector;
+import org.apache.flink.streaming.util.MockSource;
+import org.junit.Test;
+
+public class SourceTest {
+
+       @Test
+       public void fromElementsTest() {
+               List<Integer> expectedList = Arrays.asList(1, 2, 3);
+               List<Integer> actualList = MockSource.createAndExecute(new 
FromElementsFunction<Integer>(1,
+                               2, 3));
+               assertEquals(expectedList, actualList);
+       }
+
+       @Test
+       public void fromCollectionTest() {
+               List<Integer> expectedList = Arrays.asList(1, 2, 3);
+               List<Integer> actualList = MockSource.createAndExecute(new 
FromElementsFunction<Integer>(
+                               Arrays.asList(1, 2, 3)));
+               assertEquals(expectedList, actualList);
+       }
+
+       @Test
+       public void socketTextStreamTest() throws Exception {
+               List<String> expectedList = Arrays.asList("a", "b", "c");
+               List<String> actualList = new ArrayList<String>();
+
+               byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
+
+               Socket socket = mock(Socket.class);
+               when(socket.getInputStream()).thenReturn(new 
ByteArrayInputStream(data));
+               when(socket.isClosed()).thenReturn(false);
+               when(socket.isConnected()).thenReturn(true);
+
+               new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new 
MockCollector<String>(
+                               actualList), socket);
+               assertEquals(expectedList, actualList);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
new file mode 100644
index 0000000..3da6b5f
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+public class WindowCrossJoinTest implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final long MEMORYSIZE = 32;
+
+       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, 
Integer>>();
+
+       @Test
+       public void test() throws Exception {
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.setBufferTimeout(1);
+
+               ArrayList<Tuple2<Integer, String>> in1 = new 
ArrayList<Tuple2<Integer, String>>();
+               ArrayList<Tuple1<Integer>> in2 = new 
ArrayList<Tuple1<Integer>>();
+
+               in1.add(new Tuple2<Integer, String>(10, "a"));
+               in1.add(new Tuple2<Integer, String>(20, "b"));
+               in1.add(new Tuple2<Integer, String>(20, "x"));
+               in1.add(new Tuple2<Integer, String>(0, "y"));
+
+               in2.add(new Tuple1<Integer>(0));
+               in2.add(new Tuple1<Integer>(5));
+               in2.add(new Tuple1<Integer>(20));
+
+               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "b"), 20));
+               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "x"), 20));
+               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(0, "y"), 0));
+
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(10, "a"), 0));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(10, "a"), 5));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(10, "a"), 20));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "b"), 0));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "b"), 5));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "b"), 20));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "x"), 0));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "x"), 5));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(20, "x"), 20));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(0, "y"), 0));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(0, "y"), 5));
+               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
+                               new Tuple2<Integer, String>(0, "y"), 20));
+
+               DataStream<Tuple2<Integer, String>> inStream1 = 
env.fromCollection(in1);
+               DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
+
+               inStream1
+                               .join(inStream2)
+                               .onWindow(1000, new MyTimestamp<Tuple2<Integer, 
String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
+                               .addSink(new JoinResultSink());
+
+               inStream1
+                               .cross(inStream2)
+                               .onWindow(1000, new MyTimestamp<Tuple2<Integer, 
String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>(), 100)
+                               .with(new CrossFunction<Tuple2<Integer, 
String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
+
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>> cross(
+                                                       Tuple2<Integer, String> 
val1, Tuple1<Integer> val2) throws Exception {
+                                               return new 
Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
+                                       }
+                               }).addSink(new CrossResultSink());
+
+               env.execute();
+
+               assertEquals(joinExpectedResults, joinResults);
+               assertEquals(crossExpectedResults, crossResults);
+       }
+
+       private static class MyTimestamp<T> implements Timestamp<T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long getTimestamp(T value) {
+                       return 101L;
+               }
+       }
+
+       private static class JoinResultSink implements
+                       SinkFunction<Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>> value) {
+                       joinResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(value.f0, value.f1.f0));
+               }
+       }
+
+       private static class CrossResultSink implements
+                       SinkFunction<Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>> value) {
+                       crossResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(value.f0, value.f1.f0));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
new file mode 100644
index 0000000..78cbbe5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DirectedOutputTest {
+
+       private static final String TEN = "ten";
+       private static final String ODD = "odd";
+       private static final String ALL = "all";
+       private static final String EVEN_AND_ODD = "evenAndOdd";
+       private static final String ODD_AND_TEN = "oddAndTen";
+       private static final String EVEN = "even";
+       private static final String NON_SELECTED = "nonSelected";
+
+       static final class MyOutputSelector implements OutputSelector<Long> {
+               private static final long serialVersionUID = 1L;
+
+               List<String> outputs = new ArrayList<String>();
+
+               @Override
+               public Iterable<String> select(Long value) {
+                       outputs.clear();
+                       if (value % 2 == 0) {
+                               outputs.add(EVEN);
+                       } else {
+                               outputs.add(ODD);
+                       }
+
+                       if (value == 10L) {
+                               outputs.add(TEN);
+                       }
+
+                       if (value == 11L) {
+                               outputs.add(NON_SELECTED);
+                       }
+                       return outputs;
+               }
+       }
+
+       static final class ListSink implements SinkFunction<Long> {
+               private static final long serialVersionUID = 1L;
+
+               private String name;
+               private transient List<Long> list;
+
+               public ListSink(String name) {
+                       this.name = name;
+               }
+
+               @Override
+               public void invoke(Long value) {
+                       list.add(value);
+               }
+
+               private void readObject(java.io.ObjectInputStream in) throws 
IOException,
+                               ClassNotFoundException {
+                       in.defaultReadObject();
+                       outputs.put(name, new ArrayList<Long>());
+                       this.list = outputs.get(name);
+               }
+       }
+
+       private static Map<String, List<Long>> outputs = new HashMap<String, 
List<Long>>();
+
+       @Test
+       public void outputSelectorTest() throws Exception {
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
128);
+
+               SplitDataStream<Long> source = env.generateSequence(1, 
11).split(new MyOutputSelector());
+               source.select(EVEN).addSink(new ListSink(EVEN));
+               source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
+               source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
+               source.addSink(new ListSink(ALL));
+
+               env.execute();
+               assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), 
outputs.get(EVEN));
+               assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), 
outputs.get(ODD_AND_TEN));
+               assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
10L, 11L),
+                               outputs.get(EVEN_AND_ODD));
+               assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
10L, 11L), outputs.get(ALL));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
new file mode 100755
index 0000000..1615a45
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.junit.Test;
+
+public class OutputSelectorTest {
+
+       static final class MyOutputSelector implements 
OutputSelector<Tuple1<Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Iterable<String> select(Tuple1<Integer> tuple) {
+
+                       String[] outputs = new String[tuple.f0];
+
+                       for (Integer i = 0; i < tuple.f0; i++) {
+                               outputs[i] = i.toString();
+                       }
+                       return Arrays.asList(outputs);
+               }
+       }
+
+       @Test
+       public void testGetOutputs() {
+               OutputSelector<Tuple1<Integer>> selector = new 
MyOutputSelector();
+               List<String> expectedOutputs = new ArrayList<String>();
+               expectedOutputs.add("0");
+               expectedOutputs.add("1");
+               assertEquals(expectedOutputs, selector.select(new 
Tuple1<Integer>(2)));
+               expectedOutputs.add("2");
+               assertEquals(expectedOutputs, selector.select(new 
Tuple1<Integer>(3)));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
new file mode 100755
index 0000000..49b3bf8
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
+import org.apache.flink.streaming.util.MockRecordWriterFactory;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamCollectorTest {
+
+       @Test
+       public void testCollect() {
+               MockRecordWriter recWriter = MockRecordWriterFactory.create();
+               SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new 
SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
+                               null);
+               sd.setInstance(new 
StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
+
+               Collector<Tuple1<Integer>> collector = new 
StreamOutput<Tuple1<Integer>>(recWriter, 2, sd);
+               collector.collect(new Tuple1<Integer>(3));
+               collector.collect(new Tuple1<Integer>(4));
+               collector.collect(new Tuple1<Integer>(5));
+               collector.collect(new Tuple1<Integer>(6));
+
+               assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, 
recWriter.emittedRecords.toArray());
+       }
+
+       @Test
+       public void testClose() {
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
new file mode 100644
index 0000000..a91bd0c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoFlatMapTest implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       private final static class MyCoFlatMap implements 
CoFlatMapFunction<String, Integer, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap1(String value, Collector<String> coll) {
+                       for (int i = 0; i < value.length(); i++) {
+                               coll.collect(value.substring(i, i + 1));
+                       }
+               }
+
+               @Override
+               public void flatMap2(Integer value, Collector<String> coll) {
+                       coll.collect(value.toString());
+               }
+       }
+
+       @Test
+       public void coFlatMapTest() {
+               CoFlatMapInvokable<String, Integer, String> invokable = new 
CoFlatMapInvokable<String, Integer, String>(
+                               new MyCoFlatMap());
+
+               List<String> expectedList = Arrays.asList("a", "b", "c", "1", 
"d", "e", "f", "2", "g", "h",
+                               "e", "3", "4", "5");
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable,
+                               Arrays.asList("abc", "def", "ghe"), 
Arrays.asList(1, 2, 3, 4, 5));
+
+               assertEquals(expectedList, actualList);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void multipleInputTest() {
+               LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+               DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
+               DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
+               
+               try {
+                       ds1.forward().merge(ds2);
+                       fail();
+               } catch (RuntimeException e) {
+                       // expected
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
new file mode 100644
index 0000000..273bbae
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoGroupedReduceTest {
+
+       private final static class MyCoReduceFunction implements
+                       CoReduceFunction<Tuple3<String, String, String>, 
Tuple2<Integer, Integer>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple3<String, String, String> reduce1(Tuple3<String, 
String, String> value1,
+                               Tuple3<String, String, String> value2) {
+                       return new Tuple3<String, String, String>(value1.f0, 
value1.f1 + value2.f1, value1.f2);
+               }
+
+               @Override
+               public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, 
Integer> value1,
+                               Tuple2<Integer, Integer> value2) {
+                       return new Tuple2<Integer, Integer>(value1.f0, 
value1.f1 + value2.f1);
+               }
+
+               @Override
+               public String map1(Tuple3<String, String, String> value) {
+                       return value.f1;
+               }
+
+               @Override
+               public String map2(Tuple2<Integer, Integer> value) {
+                       return value.f1.toString();
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void coGroupedReduceTest() {
+               Tuple3<String, String, String> word1 = new Tuple3<String, 
String, String>("a", "word1", "b");
+               Tuple3<String, String, String> word2 = new Tuple3<String, 
String, String>("b", "word2", "a");
+               Tuple3<String, String, String> word3 = new Tuple3<String, 
String, String>("a", "word3", "a");
+               Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 
1);
+               Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 
2);
+               Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 
3);
+               Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 
4);
+               Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 
5);
+
+               KeySelector<Tuple3<String, String, String>, ?> keySelector0 = 
new KeySelector<Tuple3<String, String, String>, String>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public String getKey(Tuple3<String, String, String> 
value) throws Exception {
+                               return value.f0;
+                       }
+               };
+
+               KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new 
KeySelector<Tuple2<Integer, Integer>, Integer>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer getKey(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return value.f0;
+                       }
+               };
+
+               KeySelector<Tuple3<String, String, String>, ?> keySelector2 = 
new KeySelector<Tuple3<String, String, String>, String>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public String getKey(Tuple3<String, String, String> 
value) throws Exception {
+                               return value.f2;
+                       }
+               };
+
+               CoGroupedReduceInvokable<Tuple3<String, String, String>, 
Tuple2<Integer, Integer>, String> invokable = new 
CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, 
Integer>, String>(
+                               new MyCoReduceFunction(), keySelector0, 
keySelector1);
+
+               List<String> expected = Arrays.asList("word1", "1", "word2", 
"2", "word1word3", "3", "5",
+                               "7");
+
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable,
+                               Arrays.asList(word1, word2, word3), 
Arrays.asList(int1, int2, int3, int4, int5));
+
+               assertEquals(expected, actualList);
+
+               invokable = new CoGroupedReduceInvokable<Tuple3<String, String, 
String>, Tuple2<Integer, Integer>, String>(
+                               new MyCoReduceFunction(), keySelector2, 
keySelector1);
+
+               expected = Arrays.asList("word1", "1", "word2", "2", 
"word2word3", "3", "5", "7");
+
+               actualList = MockCoContext.createAndExecute(invokable, 
Arrays.asList(word1, word2, word3),
+                               Arrays.asList(int1, int2, int3, int4, int5));
+
+               assertEquals(expected, actualList);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
new file mode 100644
index 0000000..93d1741
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoMapTest implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       private final static class MyCoMap implements CoMapFunction<Double, 
Integer, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map1(Double value) {
+                       return value.toString();
+               }
+
+               @Override
+               public String map2(Integer value) {
+                       return value.toString();
+               }
+       }
+
+       @Test
+       public void coMapTest() {
+               CoMapInvokable<Double, Integer, String> invokable = new 
CoMapInvokable<Double, Integer, String>(new MyCoMap());
+
+               List<String> expectedList = Arrays.asList("1.1", "1", "1.2", 
"2", "1.3", "3", "1.4", "1.5");
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 
1.5), Arrays.asList(1, 2, 3));
+               
+               assertEquals(expectedList, actualList);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
new file mode 100755
index 0000000..3343ba0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoStreamReduceTest {
+
+       public static class MyCoReduceFunction implements
+                       CoReduceFunction<Integer, String, Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer reduce1(Integer value1, Integer value2) {
+                       return value1 * value2;
+               }
+
+               @Override
+               public String reduce2(String value1, String value2) {
+                       return value1 + value2;
+               }
+
+               @Override
+               public Integer map1(Integer value) {
+                       return value;
+               }
+
+               @Override
+               public Integer map2(String value) {
+                       return Integer.parseInt(value);
+               }
+
+       }
+
+       @Test
+       public void coStreamReduceTest() {
+
+               CoReduceInvokable<Integer, String, Integer> coReduce = new 
CoReduceInvokable<Integer, String, Integer>(
+                               new MyCoReduceFunction());
+
+               List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 
24);
+               List<Integer> result = MockCoContext.createAndExecute(coReduce,
+                               Arrays.asList(1, 2, 3, 4), Arrays.asList("9", 
"9", "8"));
+
+               assertEquals(expected1, result);
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
new file mode 100644
index 0000000..4ab3492
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.co.CoWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoWindowTest {
+
+       public static final class MyCoGroup1 implements 
CoWindowFunction<Integer, Integer, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings("unused")
+               @Override
+               public void coWindow(List<Integer> first, List<Integer> second, 
Collector<Integer> out)
+                               throws Exception {
+                       Integer count1 = 0;
+                       for (Integer i : first) {
+                               count1++;
+                       }
+                       Integer count2 = 0;
+                       for (Integer i : second) {
+                               count2++;
+                       }
+                       out.collect(count1);
+                       out.collect(count2);
+
+               }
+
+       }
+
+       public static final class MyCoGroup2 implements
+                       CoWindowFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coWindow(List<Tuple2<Integer, Integer>> first,
+                               List<Tuple2<Integer, Integer>> second, 
Collector<Integer> out) throws Exception {
+
+                       Set<Integer> firstElements = new HashSet<Integer>();
+                       for (Tuple2<Integer, Integer> value : first) {
+                               firstElements.add(value.f1);
+                       }
+                       for (Tuple2<Integer, Integer> value : second) {
+                               if (firstElements.contains(value.f1)) {
+                                       out.collect(value.f1);
+                               }
+                       }
+
+               }
+
+       }
+
+       private static final class MyTS1 implements Timestamp<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long getTimestamp(Integer value) {
+                       return value;
+               }
+
+       }
+
+       private static final class MyTS2 implements Timestamp<Tuple2<Integer, 
Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long getTimestamp(Tuple2<Integer, Integer> value) {
+                       return value.f0;
+               }
+
+       }
+
+       @Test
+       public void coWindowGroupReduceTest2() throws Exception {
+
+               CoWindowInvokable<Integer, Integer, Integer> invokable1 = new 
CoWindowInvokable<Integer, Integer, Integer>(
+                               new MyCoGroup1(), 2, 1, new 
TimestampWrapper<Integer>(new MyTS1(), 1),
+                               new TimestampWrapper<Integer>(new MyTS1(), 1));
+
+               // Windowsize 2, slide 1
+               // 1,2|2,3|3,4|4,5
+
+               List<Integer> input11 = new ArrayList<Integer>();
+               input11.add(1);
+               input11.add(1);
+               input11.add(2);
+               input11.add(3);
+               input11.add(3);
+
+               List<Integer> input12 = new ArrayList<Integer>();
+               input12.add(1);
+               input12.add(2);
+               input12.add(3);
+               input12.add(3);
+               input12.add(5);
+
+               // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
+               // expected output: 3,2|3,3|2,2|0,1
+
+               List<Integer> expected1 = new ArrayList<Integer>();
+               expected1.add(3);
+               expected1.add(2);
+               expected1.add(3);
+               expected1.add(3);
+               expected1.add(2);
+               expected1.add(2);
+               expected1.add(0);
+               expected1.add(1);
+
+               List<Integer> actual1 = 
MockCoContext.createAndExecute(invokable1, input11, input12);
+               assertEquals(expected1, actual1);
+
+               CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Integer>(
+                               new MyCoGroup2(), 2, 3, new 
TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
+                                               1), new 
TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
+
+               // WindowSize 2, slide 3
+               // 1,2|4,5|7,8|
+
+               List<Tuple2<Integer, Integer>> input21 = new 
ArrayList<Tuple2<Integer, Integer>>();
+               input21.add(new Tuple2<Integer, Integer>(1, 1));
+               input21.add(new Tuple2<Integer, Integer>(1, 2));
+               input21.add(new Tuple2<Integer, Integer>(2, 3));
+               input21.add(new Tuple2<Integer, Integer>(3, 4));
+               input21.add(new Tuple2<Integer, Integer>(3, 5));
+               input21.add(new Tuple2<Integer, Integer>(4, 6));
+               input21.add(new Tuple2<Integer, Integer>(4, 7));
+               input21.add(new Tuple2<Integer, Integer>(5, 8));
+
+               List<Tuple2<Integer, Integer>> input22 = new 
ArrayList<Tuple2<Integer, Integer>>();
+               input22.add(new Tuple2<Integer, Integer>(1, 1));
+               input22.add(new Tuple2<Integer, Integer>(2, 0));
+               input22.add(new Tuple2<Integer, Integer>(2, 2));
+               input22.add(new Tuple2<Integer, Integer>(3, 9));
+               input22.add(new Tuple2<Integer, Integer>(3, 4));
+               input22.add(new Tuple2<Integer, Integer>(4, 10));
+               input22.add(new Tuple2<Integer, Integer>(5, 8));
+               input22.add(new Tuple2<Integer, Integer>(5, 7));
+
+               List<Integer> expected2 = new ArrayList<Integer>();
+               expected2.add(1);
+               expected2.add(2);
+               expected2.add(8);
+               expected2.add(7);
+
+               List<Integer> actual2 = 
MockCoContext.createAndExecute(invokable2, input21, input22);
+               assertEquals(expected2, actual2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
new file mode 100644
index 0000000..969a06b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class CounterInvokableTest {
+
+       @Test
+       public void counterTest() {
+               CounterInvokable<String> invokable = new 
CounterInvokable<String>();
+
+               List<Long> expected = Arrays.asList(1L, 2L, 3L);
+               List<Long> actual = MockContext.createAndExecute(invokable, 
Arrays.asList("one", "two", "three"));
+               
+               assertEquals(expected, actual);
+       }
+}

Reply via email to