http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
new file mode 100644
index 0000000..403dd17
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class FilterTest implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       static class MyFilter implements FilterFunction<Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Integer value) throws Exception {
+                       return value % 2 == 0;
+               }
+       }
+
+       @Test 
+       public void test() {
+               FilterInvokable<Integer> invokable = new 
FilterInvokable<Integer>(new MyFilter());
+
+               List<Integer> expected = Arrays.asList(2, 4, 6);
+               List<Integer> actual = MockContext.createAndExecute(invokable, 
Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+               
+               assertEquals(expected, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
new file mode 100644
index 0000000..7424e21
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class FlatMapTest {
+
+       public static final class MyFlatMap implements FlatMapFunction<Integer, 
Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Integer value, Collector<Integer> out) 
throws Exception {
+                       if (value % 2 == 0) {
+                               out.collect(value);
+                               out.collect(value * value);
+                       }
+               }
+       }
+
+       @Test
+       public void flatMapTest() {
+               FlatMapInvokable<Integer, Integer> invokable = new 
FlatMapInvokable<Integer, Integer>(new MyFlatMap());
+               
+               List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 
64);
+               List<Integer> actual = MockContext.createAndExecute(invokable, 
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
+               
+               assertEquals(expected, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
new file mode 100755
index 0000000..ce47c67
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedReduceInvokableTest {
+
+       private static class MyReducer implements ReduceFunction<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer reduce(Integer value1, Integer value2) throws 
Exception {
+                       return value1 + value2;
+               }
+
+       }
+
+       @Test
+       public void test() {
+               GroupedReduceInvokable<Integer> invokable1 = new 
GroupedReduceInvokable<Integer>(
+                               new MyReducer(), new KeySelector<Integer, 
Integer>() {
+
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Integer getKey(Integer value) 
throws Exception {
+                                               return value;
+                                       }
+                               });
+
+               List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
+               List<Integer> actual = MockContext.createAndExecute(invokable1,
+                               Arrays.asList(1, 1, 2, 2, 3));
+
+               assertEquals(expected, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
new file mode 100644
index 0000000..f38d5c1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import 
org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedWindowInvokableTest {
+
+       KeySelector<Tuple2<Integer, String>, ?> keySelector = new 
KeySelector<Tuple2<Integer, String>, String>() {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Tuple2<Integer, String> value) throws 
Exception {
+                       return value.f1;
+               }
+       };
+
+       /**
+        * Tests that illegal arguments result in failure. The following cases 
are
+        * tested: 1) having no trigger 2) having no eviction 3) having neither
+        * eviction nor trigger 4) having both, central and distributed 
eviction.
+        */
+       @Test
+       public void testGroupedWindowInvokableFailTest() {
+
+               // create dummy reduce function
+               ReduceFunction<Object> userFunction = new 
ReduceFunction<Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Object reduce(Object value1, Object value2) 
throws Exception {
+                               return null;
+                       }
+               };
+
+               // create dummy keySelector
+               KeySelector<Object, Object> keySelector = new 
KeySelector<Object, Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Object getKey(Object value) throws Exception {
+                               return null;
+                       }
+               };
+
+               // create policy lists
+               LinkedList<CloneableEvictionPolicy<Object>> 
distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>();
+               LinkedList<CloneableTriggerPolicy<Object>> 
distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>();
+               LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = 
new LinkedList<EvictionPolicy<Object>>();
+               LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new 
LinkedList<TriggerPolicy<Object>>();
+
+               // empty trigger and policy lists should fail
+               try {
+                       new GroupedWindowInvokable<Object, 
Object>(userFunction, keySelector,
+                                       distributedTriggerPolicies, 
distributedEvictionPolicies,
+                                       centralTriggerPolicies, 
centralEvictionPolicies);
+                       fail("Creating instance without any trigger or eviction 
policy should cause an UnsupportedOperationException but didn't. (1)");
+               } catch (UnsupportedOperationException e) {
+                       // that's the expected case
+               }
+
+               // null for trigger and policy lists should fail
+               try {
+                       new GroupedWindowInvokable<Object, 
Object>(userFunction, keySelector, null, null, null,
+                                       null);
+                       fail("Creating instance without any trigger or eviction 
policy should cause an UnsupportedOperationException but didn't. (2)");
+               } catch (UnsupportedOperationException e) {
+                       // that's the expected case
+               }
+
+               // empty eviction should still fail
+               centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+               distributedTriggerPolicies.add(new 
CountTriggerPolicy<Object>(5));
+               try {
+                       new GroupedWindowInvokable<Object, 
Object>(userFunction, keySelector,
+                                       distributedTriggerPolicies, 
distributedEvictionPolicies,
+                                       centralTriggerPolicies, 
centralEvictionPolicies);
+                       fail("Creating instance without any eviction policy 
should cause an UnsupportedOperationException but didn't. (3)");
+               } catch (UnsupportedOperationException e) {
+                       // that's the expected case
+               }
+
+               // empty trigger should still fail
+               centralTriggerPolicies.clear();
+               distributedTriggerPolicies.clear();
+               centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+               try {
+                       new GroupedWindowInvokable<Object, 
Object>(userFunction, keySelector,
+                                       distributedTriggerPolicies, 
distributedEvictionPolicies,
+                                       centralTriggerPolicies, 
centralEvictionPolicies);
+                       fail("Creating instance without any trigger policy 
should cause an UnsupportedOperationException but didn't. (4)");
+               } catch (UnsupportedOperationException e) {
+                       // that's the expected case
+               }
+
+               // having both, central and distributed eviction, at the same 
time
+               // should fail
+               centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+               distributedEvictionPolicies.add(new 
CountEvictionPolicy<Object>(5));
+               try {
+                       new GroupedWindowInvokable<Object, 
Object>(userFunction, keySelector,
+                                       distributedTriggerPolicies, 
distributedEvictionPolicies,
+                                       centralTriggerPolicies, 
centralEvictionPolicies);
+                       fail("Creating instance with central and distributed 
eviction should cause an UnsupportedOperationException but didn't. (4)");
+               } catch (UnsupportedOperationException e) {
+                       // that's the expected case
+               }
+
+       }
+
+       /**
+        * Test for not active distributed triggers with single field
+        */
+       @Test
+       public void testGroupedWindowInvokableDistributedTriggerSimple() {
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(1);
+               inputs.add(5);
+               inputs.add(5);
+               inputs.add(5);
+               inputs.add(1);
+               inputs.add(1);
+               inputs.add(5);
+               inputs.add(1);
+               inputs.add(5);
+
+               List<Integer> expectedDistributedEviction = new 
ArrayList<Integer>();
+               expectedDistributedEviction.add(15);
+               expectedDistributedEviction.add(3);
+               expectedDistributedEviction.add(3);
+               expectedDistributedEviction.add(15);
+
+               List<Integer> expectedCentralEviction = new 
ArrayList<Integer>();
+               expectedCentralEviction.add(2);
+               expectedCentralEviction.add(5);
+               expectedCentralEviction.add(15);
+               expectedCentralEviction.add(2);
+               expectedCentralEviction.add(5);
+               expectedCentralEviction.add(2);
+               expectedCentralEviction.add(5);
+               expectedCentralEviction.add(1);
+               expectedCentralEviction.add(5);
+
+               LinkedList<CloneableTriggerPolicy<Integer>> triggers = new 
LinkedList<CloneableTriggerPolicy<Integer>>();
+               // Trigger on every 2nd element, but the first time after the 
3rd
+               triggers.add(new CountTriggerPolicy<Integer>(2, -1));
+
+               LinkedList<CloneableEvictionPolicy<Integer>> evictions = new 
LinkedList<CloneableEvictionPolicy<Integer>>();
+               // On every 2nd element, remove the oldest 2 elements, but the 
first
+               // time after the 3rd element
+               evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+               LinkedList<TriggerPolicy<Integer>> centralTriggers = new 
LinkedList<TriggerPolicy<Integer>>();
+
+               ReduceFunction<Integer> reduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               KeySelector<Integer, Integer> keySelector = new 
KeySelector<Integer, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer getKey(Integer value) {
+                               return value;
+                       }
+               };
+
+               GroupedWindowInvokable<Integer, Integer> invokable = new 
GroupedWindowInvokable<Integer, Integer>(
+                               reduceFunction, keySelector, triggers, 
evictions, centralTriggers, null);
+
+               List<Integer> result = MockContext.createAndExecute(invokable, 
inputs);
+
+               List<Integer> actual = new LinkedList<Integer>();
+               for (Integer current : result) {
+                       actual.add(current);
+               }
+
+               assertEquals(new HashSet<Integer>(expectedDistributedEviction),
+                               new HashSet<Integer>(actual));
+               assertEquals(expectedDistributedEviction.size(), actual.size());
+
+               // Run test with central eviction
+               triggers.clear();
+               centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
+               LinkedList<EvictionPolicy<Integer>> centralEvictions = new 
LinkedList<EvictionPolicy<Integer>>();
+               centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, 
-1));
+
+               invokable = new GroupedWindowInvokable<Integer, 
Integer>(reduceFunction, keySelector,
+                               triggers, null, centralTriggers, 
centralEvictions);
+
+               result = MockContext.createAndExecute(invokable, inputs);
+               actual = new LinkedList<Integer>();
+               for (Integer current : result) {
+                       actual.add(current);
+               }
+
+               assertEquals(new HashSet<Integer>(expectedCentralEviction), new 
HashSet<Integer>(actual));
+               assertEquals(expectedCentralEviction.size(), actual.size());
+       }
+
+       /**
+        * Test for non active distributed triggers with separated key field
+        */
+       @Test
+       public void testGroupedWindowInvokableDistributedTriggerComplex() {
+               List<Tuple2<Integer, String>> inputs2 = new 
ArrayList<Tuple2<Integer, String>>();
+               inputs2.add(new Tuple2<Integer, String>(1, "a"));
+               inputs2.add(new Tuple2<Integer, String>(0, "b"));
+               inputs2.add(new Tuple2<Integer, String>(2, "a"));
+               inputs2.add(new Tuple2<Integer, String>(-1, "a"));
+               inputs2.add(new Tuple2<Integer, String>(-2, "a"));
+               inputs2.add(new Tuple2<Integer, String>(10, "a"));
+               inputs2.add(new Tuple2<Integer, String>(2, "b"));
+               inputs2.add(new Tuple2<Integer, String>(1, "a"));
+
+               List<Tuple2<Integer, String>> expected2 = new 
ArrayList<Tuple2<Integer, String>>();
+               expected2.add(new Tuple2<Integer, String>(-1, "a"));
+               expected2.add(new Tuple2<Integer, String>(-2, "a"));
+               expected2.add(new Tuple2<Integer, String>(0, "b"));
+
+               LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> 
triggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
+               // Trigger on every 2nd element, but the first time after the 
3rd
+               triggers.add(new CountTriggerPolicy<Tuple2<Integer, 
String>>(3));
+
+               LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> 
evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
+               // On every 2nd element, remove the oldest 2 elements, but the 
first
+               // time after the 3rd element
+               evictions.add(new TumblingEvictionPolicy<Tuple2<Integer, 
String>>());
+
+               LinkedList<TriggerPolicy<Tuple2<Integer, String>>> 
centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
+
+               GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, 
String>> invokable2 = new GroupedWindowInvokable<Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
+                               new ReduceFunction<Tuple2<Integer, String>>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Tuple2<Integer, String> 
reduce(Tuple2<Integer, String> value1,
+                                                       Tuple2<Integer, String> 
value2) throws Exception {
+                                               if (value1.f0 <= value2.f0) {
+                                                       return value1;
+                                               } else {
+                                                       return value2;
+                                               }
+                                       }
+                               }, keySelector, triggers, evictions, 
centralTriggers, null);
+
+               List<Tuple2<Integer, String>> result = 
MockContext.createAndExecute(invokable2, inputs2);
+
+               List<Tuple2<Integer, String>> actual2 = new 
LinkedList<Tuple2<Integer, String>>();
+               for (Tuple2<Integer, String> current : result) {
+                       actual2.add(current);
+               }
+
+               assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
+                               new HashSet<Tuple2<Integer, String>>(actual2));
+               assertEquals(expected2.size(), actual2.size());
+       }
+
+       /**
+        * Test for active centralized trigger
+        */
+       @Test
+       public void testGroupedWindowInvokableCentralActiveTrigger() {
+
+               List<Tuple2<Integer, String>> inputs = new 
ArrayList<Tuple2<Integer, String>>();
+               inputs.add(new Tuple2<Integer, String>(1, "a"));
+               inputs.add(new Tuple2<Integer, String>(1, "b"));
+               inputs.add(new Tuple2<Integer, String>(1, "c"));
+               inputs.add(new Tuple2<Integer, String>(2, "a"));
+               inputs.add(new Tuple2<Integer, String>(2, "b"));
+               inputs.add(new Tuple2<Integer, String>(2, "c"));
+               inputs.add(new Tuple2<Integer, String>(2, "b"));
+               inputs.add(new Tuple2<Integer, String>(2, "a"));
+               inputs.add(new Tuple2<Integer, String>(2, "c"));
+               inputs.add(new Tuple2<Integer, String>(3, "c"));
+               inputs.add(new Tuple2<Integer, String>(3, "a"));
+               inputs.add(new Tuple2<Integer, String>(3, "b"));
+               inputs.add(new Tuple2<Integer, String>(4, "a"));
+               inputs.add(new Tuple2<Integer, String>(4, "b"));
+               inputs.add(new Tuple2<Integer, String>(4, "c"));
+               inputs.add(new Tuple2<Integer, String>(5, "c"));
+               inputs.add(new Tuple2<Integer, String>(5, "a"));
+               inputs.add(new Tuple2<Integer, String>(5, "b"));
+               inputs.add(new Tuple2<Integer, String>(10, "b"));
+               inputs.add(new Tuple2<Integer, String>(10, "a"));
+               inputs.add(new Tuple2<Integer, String>(10, "c"));
+               inputs.add(new Tuple2<Integer, String>(11, "a"));
+               inputs.add(new Tuple2<Integer, String>(11, "a"));
+               inputs.add(new Tuple2<Integer, String>(11, "c"));
+               inputs.add(new Tuple2<Integer, String>(11, "c"));
+               inputs.add(new Tuple2<Integer, String>(11, "b"));
+               inputs.add(new Tuple2<Integer, String>(11, "b"));
+
+               // Expected result:
+               // For each group (a,b and c):
+               // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+               // 12-12-5-10-32
+
+               List<Tuple2<Integer, String>> expected = new 
ArrayList<Tuple2<Integer, String>>();
+               expected.add(new Tuple2<Integer, String>(12, "a"));
+               expected.add(new Tuple2<Integer, String>(12, "b"));
+               expected.add(new Tuple2<Integer, String>(12, "c"));
+               expected.add(new Tuple2<Integer, String>(12, "a"));
+               expected.add(new Tuple2<Integer, String>(12, "b"));
+               expected.add(new Tuple2<Integer, String>(12, "c"));
+               expected.add(new Tuple2<Integer, String>(5, "a"));
+               expected.add(new Tuple2<Integer, String>(5, "b"));
+               expected.add(new Tuple2<Integer, String>(5, "c"));
+               expected.add(new Tuple2<Integer, String>(10, "a"));
+               expected.add(new Tuple2<Integer, String>(10, "b"));
+               expected.add(new Tuple2<Integer, String>(10, "c"));
+               expected.add(new Tuple2<Integer, String>(32, "a"));
+               expected.add(new Tuple2<Integer, String>(32, "b"));
+               expected.add(new Tuple2<Integer, String>(32, "c"));
+
+               Timestamp<Tuple2<Integer, String>> myTimeStamp = new 
Timestamp<Tuple2<Integer, String>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public long getTimestamp(Tuple2<Integer, String> value) 
{
+                               return value.f0;
+                       }
+               };
+
+               TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = 
new TimestampWrapper<Tuple2<Integer, String>>(
+                               myTimeStamp, 1);
+
+               ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new 
ReduceFunction<Tuple2<Integer, String>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Tuple2<Integer, String> reduce(Tuple2<Integer, 
String> value1,
+                                       Tuple2<Integer, String> value2) throws 
Exception {
+                               return new Tuple2<Integer, String>(value1.f0 + 
value2.f0, value1.f1);
+                       }
+               };
+
+               LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = 
new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
+               // Trigger every 2 time units but delay the first trigger by 2 
(First
+               // trigger after 4, then every 2)
+               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStampWrapper, 2L));
+
+               LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> 
evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
+               // Always delete all elements older then 4
+               evictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStampWrapper));
+
+               LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> 
distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, 
String>>>();
+
+               GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, 
String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
+                               myReduceFunction, keySelector, 
distributedTriggers, evictions, triggers, null);
+
+               ArrayList<Tuple2<Integer, String>> result = new 
ArrayList<Tuple2<Integer, String>>();
+               for (Tuple2<Integer, String> t : 
MockContext.createAndExecute(invokable, inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+                               new HashSet<Tuple2<Integer, String>>(result));
+               assertEquals(expected.size(), result.size());
+
+               // repeat the test with central eviction. The result should be 
the same.
+               triggers.clear();
+               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStampWrapper, 2L));
+               evictions.clear();
+               LinkedList<EvictionPolicy<Tuple2<Integer, String>>> 
centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
+               centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStampWrapper));
+
+               invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
+                               myReduceFunction, keySelector, 
distributedTriggers, evictions, triggers,
+                               centralEvictions);
+
+               result = new ArrayList<Tuple2<Integer, String>>();
+               for (Tuple2<Integer, String> t : 
MockContext.createAndExecute(invokable, inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+                               new HashSet<Tuple2<Integer, String>>(result));
+               assertEquals(expected.size(), result.size());
+       }
+
+       /**
+        * Test for multiple centralized trigger
+        */
+       @Test
+       public void testGroupedWindowInvokableMultipleCentralTrigger() {
+               LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
+               triggers.add(new CountTriggerPolicy<Integer>(8));
+               triggers.add(new CountTriggerPolicy<Integer>(5));
+
+               LinkedList<CloneableEvictionPolicy<Integer>> evictions = new 
LinkedList<CloneableEvictionPolicy<Integer>>();
+               // The active wrapper causes eviction even on (fake) elements 
which
+               // triggered, but does not belong to the group.
+               evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
+                               new TumblingEvictionPolicy<Integer>()));
+
+               LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers 
= new LinkedList<CloneableTriggerPolicy<Integer>>();
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(2);
+               inputs.add(2);
+               inputs.add(2);
+               inputs.add(1);
+               // 1st Trigger: 2;6
+               inputs.add(2);
+               inputs.add(1);
+               inputs.add(2);
+               // 2nd Trigger: 1;4
+               inputs.add(2);
+               inputs.add(1);
+               // Final: 1,2
+
+               List<Integer> expected = new ArrayList<Integer>();
+               expected.add(2);
+               expected.add(6);
+               expected.add(4);
+               expected.add(1);
+               expected.add(2);
+               expected.add(1);
+
+               ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               GroupedWindowInvokable<Integer, Integer> invokable = new 
GroupedWindowInvokable<Integer, Integer>(
+                               myReduceFunction, new KeySelector<Integer, 
Integer>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Integer getKey(Integer value) {
+                                               return value;
+                                       }
+                               }, distributedTriggers, evictions, triggers, 
null);
+
+               ArrayList<Integer> result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(new HashSet<Integer>(expected), new 
HashSet<Integer>(result));
+               assertEquals(expected.size(), result.size());
+       }
+
+       /**
+        * Test for combination of centralized trigger and distributed trigger 
at
+        * the same time
+        */
+       @Test
+       public void testGroupedWindowInvokableCentralAndDistrTrigger() {
+               LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
+               triggers.add(new CountTriggerPolicy<Integer>(8));
+               triggers.add(new CountTriggerPolicy<Integer>(5));
+
+               LinkedList<CloneableEvictionPolicy<Integer>> evictions = new 
LinkedList<CloneableEvictionPolicy<Integer>>();
+               // The active wrapper causes eviction even on (fake) elements 
which
+               // triggered, but does not belong to the group.
+               evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
+                               new TumblingEvictionPolicy<Integer>()));
+
+               LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers 
= new LinkedList<CloneableTriggerPolicy<Integer>>();
+               distributedTriggers.add(new CountTriggerPolicy<Integer>(2));
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(2);
+               inputs.add(2);
+               // local on 2 => 4
+               inputs.add(2);
+               inputs.add(1);
+               // local on 1 => 2
+               // and 1st Central: 2;2
+               // SUMS up to 2;2
+               inputs.add(2);
+               // local on 2 => 2
+               inputs.add(1);
+               inputs.add(2);
+               // 2nd Central: 1;2
+               inputs.add(2);
+               inputs.add(1);
+               // Final: 1,2
+
+               List<Integer> expected = new ArrayList<Integer>();
+               expected.add(4);
+               expected.add(2);
+               expected.add(2);
+               expected.add(2);
+               expected.add(1);
+               expected.add(2);
+               expected.add(1);
+               expected.add(2);
+
+               ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               GroupedWindowInvokable<Integer, Integer> invokable = new 
GroupedWindowInvokable<Integer, Integer>(
+                               myReduceFunction, new KeySelector<Integer, 
Integer>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Integer getKey(Integer value) {
+                                               return value;
+                                       }
+                               }, distributedTriggers, evictions, triggers, 
null);
+
+               ArrayList<Integer> result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(new HashSet<Integer>(expected), new 
HashSet<Integer>(result));
+               assertEquals(expected.size(), result.size());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
new file mode 100644
index 0000000..5390ec9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class MapTest {
+
+       private static class Map implements MapFunction<Integer, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map(Integer value) throws Exception {
+                       return "+" + (value + 1);
+               }
+       }
+       
+       @Test
+       public void mapInvokableTest() {
+               MapInvokable<Integer, String> invokable = new 
MapInvokable<Integer, String>(new Map());
+               
+               List<String> expectedList = Arrays.asList("+2", "+3", "+4");
+               List<String> actualList = 
MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
+               
+               assertEquals(expectedList, actualList);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
new file mode 100644
index 0000000..11c44cd
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class ProjectTest implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       @Test
+       public void test() {
+
+               TypeInformation<Tuple5<Integer, String, Integer, String, 
Integer>> inType = TypeExtractor
+                               .getForObject(new Tuple5<Integer, String, 
Integer, String, Integer>(2, "a", 3, "b",
+                                               4));
+
+               int[] fields = new int[] { 4, 4, 3 };
+               Class<?>[] classes = new Class<?>[] { Integer.class, 
Integer.class, String.class };
+
+               @SuppressWarnings("unchecked")
+               ProjectInvokable<Tuple5<Integer, String, Integer, String, 
Integer>, Tuple3<Integer, Integer, String>> invokable = new 
ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, 
Tuple3<Integer, Integer, String>>(
+                               fields,
+                               (TypeInformation<Tuple3<Integer, Integer, 
String>>) StreamProjection
+                                               .extractFieldTypes(fields, 
classes, inType));
+
+               List<Tuple5<Integer, String, Integer, String, Integer>> input = 
new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
+               input.add(new Tuple5<Integer, String, Integer, String, 
Integer>(2, "a", 3, "b", 4));
+               input.add(new Tuple5<Integer, String, Integer, String, 
Integer>(2, "s", 3, "c", 2));
+               input.add(new Tuple5<Integer, String, Integer, String, 
Integer>(2, "a", 3, "c", 2));
+               input.add(new Tuple5<Integer, String, Integer, String, 
Integer>(2, "a", 3, "a", 7));
+
+               List<Tuple3<Integer, Integer, String>> expected = new 
ArrayList<Tuple3<Integer, Integer, String>>();
+               expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
+               expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+               expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+               expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
+
+               assertEquals(expected, MockContext.createAndExecute(invokable, 
input));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
new file mode 100755
index 0000000..ae866e6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamReduceTest {
+
+       private static class MyReducer implements ReduceFunction<Integer>{
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer reduce(Integer value1, Integer value2) throws 
Exception {
+                       return value1+value2;
+               }
+               
+       }
+       
+       @Test
+       public void test() {
+               StreamReduceInvokable<Integer> invokable1 = new 
StreamReduceInvokable<Integer>(
+                               new MyReducer());
+
+               List<Integer> expected = Arrays.asList(1,2,4,7,10);
+               List<Integer> actual = MockContext.createAndExecute(invokable1,
+                               Arrays.asList(1, 1, 2, 3, 3));
+
+               assertEquals(expected, actual);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
new file mode 100644
index 0000000..83b4596
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowInvokableTest {
+
+       /**
+        * Test case equal to {@link WindowReduceInvokableTest}
+        */
+       @Test
+       public void testWindowInvokableWithTimePolicy() {
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(2);
+               inputs.add(2);
+               inputs.add(3);
+               inputs.add(4);
+               inputs.add(5);
+               inputs.add(10);
+               inputs.add(11);
+               inputs.add(11);
+               // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+               // 12-12-5-10-32
+
+               List<Integer> expected = new ArrayList<Integer>();
+               expected.add(12);
+               expected.add(12);
+               expected.add(5);
+               expected.add(10);
+               expected.add(32);
+
+               Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public long getTimestamp(Integer value) {
+                               return value;
+                       }
+               };
+
+               ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
+               // Trigger every 2 time units but delay the first trigger by 2 
(First
+               // trigger after 4, then every 2)
+               triggers.add(new TimeTriggerPolicy<Integer>(2L, new 
TimestampWrapper<Integer>(myTimeStamp,
+                               1), 2L));
+
+               LinkedList<EvictionPolicy<Integer>> evictions = new 
LinkedList<EvictionPolicy<Integer>>();
+               // Always delete all elements older then 4
+               evictions.add(new TimeEvictionPolicy<Integer>(4L, new 
TimestampWrapper<Integer>(
+                               myTimeStamp, 1)));
+
+               WindowInvokable<Integer, Integer> invokable = new 
WindowReduceInvokable<Integer>(
+                               myReduceFunction, triggers, evictions);
+
+               ArrayList<Integer> result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(expected, result);
+       }
+
+       /**
+        * Test case equal to {@link BatchReduceTest}
+        */
+       @Test
+       public void testWindowInvokableWithCountPolicy() {
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               for (Integer i = 1; i <= 10; i++) {
+                       inputs.add(i);
+               }
+
+               ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               /*
+                * The following setup reassembles the batch size 3 and the 
slide size 2
+                * of the BatchReduceInvokable.
+                */
+               LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
+               // Trigger on every 2nd element, but the first time after the 
3rd
+               triggers.add(new CountTriggerPolicy<Integer>(2, -1));
+
+               LinkedList<EvictionPolicy<Integer>> evictions = new 
LinkedList<EvictionPolicy<Integer>>();
+               // On every 2nd element, remove the oldest 2 elements, but the 
first
+               // time after the 3rd element
+               evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+               WindowInvokable<Integer, Integer> invokable = new 
WindowReduceInvokable<Integer>(
+                               myReduceFunction, triggers, evictions);
+
+               List<Integer> expected = new ArrayList<Integer>();
+               expected.add(6);
+               expected.add(12);
+               expected.add(18);
+               expected.add(24);
+               expected.add(19);
+               List<Integer> result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
+                       result.add(t);
+               }
+               assertEquals(expected, result);
+
+               /*
+                * Begin test part 2
+                */
+
+               List<Integer> inputs2 = new ArrayList<Integer>();
+               inputs2.add(1);
+               inputs2.add(2);
+               inputs2.add(-5); // changed this value to make sure it is 
excluded from
+                                                       // the result
+               inputs2.add(-3);
+               inputs2.add(-4);
+
+               myReduceFunction = new ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               if (value1 <= value2) {
+                                       return value1;
+                               } else {
+                                       return value2;
+                               }
+                       };
+               };
+
+               /*
+                * The following setup reassembles the batch size 2 and the 
slide size 3
+                * of the BatchReduceInvokable.
+                */
+               triggers = new LinkedList<TriggerPolicy<Integer>>();
+               // Trigger after every 3rd element, but the first time after 
the 2nd
+               triggers.add(new CountTriggerPolicy<Integer>(3, 1));
+
+               evictions = new LinkedList<EvictionPolicy<Integer>>();
+               // On every 3rd element, remove the oldest 3 elements, but the 
first
+               // time after on the 5th element
+               evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
+
+               WindowInvokable<Integer, Integer> invokable2 = new 
WindowReduceInvokable<Integer>(
+                               myReduceFunction, triggers, evictions);
+
+               List<Integer> expected2 = new ArrayList<Integer>();
+               expected2.add(1);
+               expected2.add(-4);
+
+               result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable2, 
inputs2)) {
+                       result.add(t);
+               }
+
+               assertEquals(expected2, result);
+
+       }
+
+       @Test
+       public void testWindowInvokableWithMultiplePolicies() {
+               LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
+               triggers.add(new CountTriggerPolicy<Integer>(2));
+               triggers.add(new CountTriggerPolicy<Integer>(3));
+
+               LinkedList<EvictionPolicy<Integer>> evictions = new 
LinkedList<EvictionPolicy<Integer>>();
+               evictions.add(new CountEvictionPolicy<Integer>(2, 2));
+               evictions.add(new CountEvictionPolicy<Integer>(3, 3));
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               for (Integer i = 1; i <= 10; i++) {
+                       inputs.add(i);
+               }
+               /**
+                * <code>
+                * VAL: 1,2,3,4,5,6,7,8,9,10
+                * TR1:   |   |   |   |   |
+                * TR2:     |     |     |
+                * EV1:   2   2   2   2   2
+                * EV2:     3     3     3
+                * </code>
+                */
+
+               List<Integer> expected = new ArrayList<Integer>();
+               expected.add(3);
+               expected.add(3);
+               expected.add(4);
+               expected.add(11);
+               expected.add(15);
+               expected.add(9);
+               expected.add(10);
+
+               ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer reduce(Integer value1, Integer value2) 
throws Exception {
+                               return value1 + value2;
+                       }
+               };
+
+               WindowInvokable<Integer, Integer> invokable = new 
WindowReduceInvokable<Integer>(
+                               myReduceFunction, triggers, evictions);
+
+               ArrayList<Integer> result = new ArrayList<Integer>();
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
+                       result.add(t);
+               }
+
+               assertEquals(expected, result);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
new file mode 100644
index 0000000..b044d88
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamrecord;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class UIDTest {
+
+       //TODO fix with matching DataOutputStream and DataOutputView
+       @Test
+       public void test() throws IOException {
+               DataOutputSerializer out = new DataOutputSerializer(64);
+               
+               UID id = new UID(3);
+               id.write(out);
+
+               ByteBuffer buff = out.wrapAsByteBuffer();
+
+               
+               DataInputDeserializer in = new DataInputDeserializer(buff);
+               
+               UID id2 = new UID();
+               id2.read(in);
+
+               assertEquals(id.getChannelId(), id2.getChannelId());
+               assertArrayEquals(id.getGeneratedId(), id2.getGeneratedId());
+               assertArrayEquals(id.getId(), id2.getId());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
new file mode 100755
index 0000000..a40048e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+
+public class MockRecordWriter extends 
RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
+
+       public ArrayList<Integer> emittedRecords;
+
+       public MockRecordWriter(DataSourceTask<?> inputBase, 
Class<StreamRecord<Tuple1<Integer>>> outputClass) {
+               super(inputBase.getEnvironment().getWriter(0));
+       }
+
+       public boolean initList() {
+               emittedRecords = new ArrayList<Integer>();
+               return true;
+       }
+       
+       @Override
+       public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> 
record) {
+               emittedRecords.add(record.getInstance().getObject().f0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
new file mode 100644
index 0000000..b103d84
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamVertexTest {
+
+       private static Map<Integer, Integer> data = new HashMap<Integer, 
Integer>();
+
+       public static class MySource implements SourceFunction<Tuple1<Integer>> 
{
+               private static final long serialVersionUID = 1L;
+
+               private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
+
+               @Override
+               public void invoke(Collector<Tuple1<Integer>> collector) throws 
Exception {
+                       for (int i = 0; i < 10; i++) {
+                               tuple.f0 = i;
+                               collector.collect(tuple);
+                       }
+               }
+       }
+
+       public static class MyTask extends RichMapFunction<Tuple1<Integer>, 
Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> map(Tuple1<Integer> value) 
throws Exception {
+                       Integer i = value.f0;
+                       return new Tuple2<Integer, Integer>(i, i + 1);
+               }
+       }
+
+       public static class MySink implements SinkFunction<Tuple2<Integer, 
Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> tuple) {
+                       Integer k = tuple.getField(0);
+                       Integer v = tuple.getField(1);
+                       data.put(k, v);
+               }
+       }
+
+       @SuppressWarnings("unused")
+       private static final int PARALLELISM = 1;
+       private static final int SOURCE_PARALELISM = 1;
+       private static final long MEMORYSIZE = 32;
+
+       @Test
+       public void wrongJobGraph() {
+               LocalStreamEnvironment env = StreamExecutionEnvironment
+                               .createLocalEnvironment(SOURCE_PARALELISM);
+
+
+               try {
+                       env.fromCollection(null);
+                       fail();
+               } catch (NullPointerException e) {
+               }
+
+               try {
+                       env.fromElements();
+                       fail();
+               } catch (IllegalArgumentException e) {
+               }
+
+               try {
+                       env.generateSequence(-10, -30);
+                       fail();
+               } catch (IllegalArgumentException e) {
+               }
+
+               try {
+                       env.setBufferTimeout(-10);
+                       fail();
+               } catch (IllegalArgumentException e) {
+               }
+
+               try {
+                       env.generateSequence(1, 10).project(2);
+                       fail();
+               } catch (RuntimeException e) {
+               }
+       }
+
+       private static class CoMap implements CoMapFunction<String, Long, 
String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map1(String value) {
+                       return value;
+               }
+
+               @Override
+               public String map2(Long value) {
+                       return value.toString();
+               }
+       }
+
+       static HashSet<String> resultSet;
+
+       private static class SetSink implements SinkFunction<String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(String value) {
+                       resultSet.add(value);
+               }
+       }
+
+       @Test
+       public void coTest() throws Exception {
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+
+               DataStream<String> fromStringElements = env.fromElements("aa", 
"bb", "cc");
+               DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+
+               fromStringElements.connect(generatedSequence).map(new 
CoMap()).addSink(new SetSink());
+
+               resultSet = new HashSet<String>();
+               env.execute();
+
+               HashSet<String> expectedSet = new 
HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+                               "2", "3"));
+               assertEquals(expectedSet, resultSet);
+       }
+
+       @Test
+       public void runStream() throws Exception {
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+
+               env.addSource(new 
MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new 
MySink());
+
+               env.execute();
+               assertEquals(10, data.keySet().size());
+
+               for (Integer k : data.keySet()) {
+                       assertEquals((Integer) (k + 1), data.get(k));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
new file mode 100644
index 0000000..e12b254
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CosineDistanceTest {
+       
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testCosineDistance() {
+               
+               //Reference calculated using wolfram alpha
+               double[][][] testdata={
+                               {{0,0,0},{0,0,0}},
+                               {{0,0,0},{1,2,3}},
+                               {{1,2,3},{0,0,0}},
+                               {{1,2,3},{4,5,6}},
+                               {{1,2,3},{-4,-5,-6}},
+                               {{1,2,-3},{-4,5,-6}},
+                               {{1,2,3,4},{5,6,7,8}},
+                               {{1,2},{3,4}},
+                               {{1},{2}},
+                       };
+               double[] referenceSolutions={
+                               0,
+                               0,
+                               0,
+                               0.025368,
+                               1.974631,
+                               0.269026,
+                               0.031136,
+                               0.016130,
+                               0
+               };
+               
+               for (int i = 0; i < testdata.length; i++) {
+                       assertEquals("Wrong result for inputs " + 
arrayToString(testdata[i][0]) + " and "
+                                       + arrayToString(testdata[i][0]), 
referenceSolutions[i],
+                                       new 
CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+               }
+       }
+       
+       private String arrayToString(double[] in){
+               if (in.length==0) return "{}";
+               String result="{";
+               for (double d:in){
+                       result+=d+",";
+               }
+               return result.substring(0, result.length()-1)+"}";
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
new file mode 100644
index 0000000..8c62497
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class EuclideanDistanceTest {
+       
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testEuclideanDistance() {
+               
+               //Reference calculated using wolfram alpha
+               double[][][] testdata={
+                               {{0,0,0},{0,0,0}},
+                               {{0,0,0},{1,2,3}},
+                               {{1,2,3},{0,0,0}},
+                               {{1,2,3},{4,5,6}},
+                               {{1,2,3},{-4,-5,-6}},
+                               {{1,2,-3},{-4,5,-6}},
+                               {{1,2,3,4},{5,6,7,8}},
+                               {{1,2},{3,4}},
+                               {{1},{2}},
+                       };
+               double[] referenceSolutions={
+                               0,
+                               3.741657,
+                               3.741657,
+                               5.196152,
+                               12.4499,
+                               6.557439,
+                               8.0,
+                               2.828427,
+                               1
+               };
+               
+               for (int i = 0; i < testdata.length; i++) {
+                       assertEquals("Wrong result for inputs " + 
arrayToString(testdata[i][0]) + " and "
+                                       + arrayToString(testdata[i][0]), 
referenceSolutions[i],
+                                       new 
EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+               }
+               
+       }
+       
+       private String arrayToString(double[] in){
+               if (in.length==0) return "{}";
+               String result="{";
+               for (double d:in){
+                       result+=d+",";
+               }
+               return result.substring(0, result.length()-1)+"}";
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
new file mode 100644
index 0000000..17d3974
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrayFromTupleTest {
+
+       private String[] testStrings;
+
+       @Before
+       public void init() {
+               testStrings = new String[Tuple.MAX_ARITY];
+               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+                       testStrings[i] = Integer.toString(i);
+               }
+       }
+
+       @Test
+       public void testConvertFromTupleToArray() throws 
InstantiationException, IllegalAccessException {
+               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+                       Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
+                       String[] currentArray = new String[i + 1];
+                       for (int j = 0; j <= i; j++) {
+                               currentTuple.setField(testStrings[j], j);
+                               currentArray[j] = testStrings[j];
+                       }
+                       arrayEqualityCheck(currentArray, new 
ArrayFromTuple().extract(currentTuple));
+               }
+       }
+
+       @Test
+       public void testUserSpecifiedOrder() throws InstantiationException, 
IllegalAccessException {
+               Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 
1].newInstance();
+               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+                       currentTuple.setField(testStrings[i], i);
+               }
+
+               String[] expected = { testStrings[5], testStrings[3], 
testStrings[6], testStrings[7],
+                               testStrings[0] };
+               arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 
0).extract(currentTuple));
+
+               String[] expected2 = { testStrings[0], 
testStrings[Tuple.MAX_ARITY - 1] };
+               arrayEqualityCheck(expected2,
+                               new ArrayFromTuple(0, Tuple.MAX_ARITY - 
1).extract(currentTuple));
+
+               String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], 
testStrings[0] };
+               arrayEqualityCheck(expected3,
+                               new ArrayFromTuple(Tuple.MAX_ARITY - 1, 
0).extract(currentTuple));
+
+               String[] expected4 = { testStrings[13], testStrings[4], 
testStrings[5], testStrings[4],
+                               testStrings[2], testStrings[8], testStrings[6], 
testStrings[2], testStrings[8],
+                               testStrings[3], testStrings[5], testStrings[2], 
testStrings[16], testStrings[4],
+                               testStrings[3], testStrings[2], testStrings[6], 
testStrings[4], testStrings[7],
+                               testStrings[4], testStrings[2], testStrings[8], 
testStrings[7], testStrings[2] };
+               arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 
2, 8, 6, 2, 8, 3, 5, 2, 16,
+                               4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 
2).extract(currentTuple));
+       }
+
+       private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+               assertEquals("The result arrays must have the same length", 
array1.length, array2.length);
+               for (int i = 0; i < array1.length; i++) {
+                       assertEquals("Unequal fields at position " + i, 
array1[i], array2[i]);
+               }
+       }
+
+       private static final Class<?>[] CLASSES = new Class<?>[] { 
Tuple1.class, Tuple2.class,
+                       Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, 
Tuple7.class, Tuple8.class,
+                       Tuple9.class, Tuple10.class, Tuple11.class, 
Tuple12.class, Tuple13.class,
+                       Tuple14.class, Tuple15.class, Tuple16.class, 
Tuple17.class, Tuple18.class,
+                       Tuple19.class, Tuple20.class, Tuple21.class, 
Tuple22.class, Tuple23.class,
+                       Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
new file mode 100644
index 0000000..82a876a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
+import org.apache.flink.streaming.api.windowing.extractor.ConcatinatedExtract;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
+import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcatinatedExtractTest {
+
+       private String[] testStringArray1 = { "1", "2", "3" };
+       private int[] testIntArray1 = { 1, 2, 3 };
+       private String[] testStringArray2 = { "4", "5", "6" };
+       private int[] testIntArray2 = { 4, 5, 6 };
+       private String[] testStringArray3 = { "7", "8", "9" };
+       private int[] testIntArray3 = { 7, 8, 9 };
+       private Tuple2<String[], int[]>[] testTuple2Array;
+       private Tuple2<String[], int[]> testTuple2;
+       private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> 
testData;
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void setupData() {
+               testTuple2Array = new Tuple2[2];
+               testTuple2Array[0] = new Tuple2<String[], 
int[]>(testStringArray1, testIntArray2);
+               testTuple2Array[1] = new Tuple2<String[], 
int[]>(testStringArray2, testIntArray1);
+
+               testTuple2 = new Tuple2<String[], int[]>(testStringArray3, 
testIntArray3);
+
+               testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], 
int[]>[]>(testTuple2,
+                               testTuple2Array);
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void test1() {
+               Extractor ext = new ConcatinatedExtract(new FieldFromTuple(0), 
new FieldFromTuple(1))
+                               .add(new FieldsFromArray(Integer.class, 2, 1, 
0));
+               int[] expected = { testIntArray3[2], testIntArray3[1], 
testIntArray3[0] };
+               assertEquals(new Integer(expected[0]), ((Integer[]) 
ext.extract(testData))[0]);
+               assertEquals(new Integer(expected[1]), ((Integer[]) 
ext.extract(testData))[1]);
+               assertEquals(new Integer(expected[2]), ((Integer[]) 
ext.extract(testData))[2]);
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void test2() {
+               Extractor ext = new ConcatinatedExtract(new FieldFromTuple(1), 
// Tuple2<String[],int[]>[]
+                               new FieldsFromArray(Tuple2.class, 1)) // 
Tuple2<String[],int[]>[]
+                               .add(new FieldFromArray(0)) // 
Tuple2<String[],int[]>
+                               .add(new ArrayFromTuple(0)) // Object[] 
(Containing String[])
+                               .add(new FieldFromArray(0)) // String[]
+                               .add(new FieldFromArray(1)); // String
+
+               String expected2 = testStringArray2[1];
+               assertEquals(expected2, ext.extract(testData));
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
new file mode 100644
index 0000000..2d4dbcf
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
+import org.junit.Test;
+
+public class FieldFromArrayTest {
+
+       String[] testStringArray = { "0", "1", "2", "3", "4" };
+       Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+       int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+       @Test
+       public void testStringArray() {
+               for (int i = 0; i < this.testStringArray.length; i++) {
+                       assertEquals(this.testStringArray[i],
+                                       new 
FieldFromArray<String>(i).extract(testStringArray));
+               }
+       }
+
+       @Test
+       public void testIntegerArray() {
+               for (int i = 0; i < this.testIntegerArray.length; i++) {
+                       assertEquals(this.testIntegerArray[i],
+                                       new 
FieldFromArray<String>(i).extract(testIntegerArray));
+               }
+       }
+
+       @Test
+       public void testIntArray() {
+               for (int i = 0; i < this.testIntArray.length; i++) {
+                       assertEquals(new Integer(this.testIntArray[i]),
+                                       new 
FieldFromArray<Integer>(i).extract(testIntArray));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
new file mode 100644
index 0000000..528611a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldFromTupleTest {
+
+       private String[] testStrings;
+
+       @Before
+       public void init() {
+               testStrings = new String[Tuple.MAX_ARITY];
+               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+                       testStrings[i] = Integer.toString(i);
+               }
+       }
+
+       @Test
+       public void testSingleFieldExtraction() throws InstantiationException, 
IllegalAccessException {
+               // extract single fields
+               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+                       Tuple current = (Tuple) CLASSES[i].newInstance();
+                       for (int j = 0; j < i; j++) {
+                               current.setField(testStrings[j], j);
+                       }
+                       for (int j = 0; j < i; j++) {
+                               assertEquals(testStrings[j], new 
FieldFromTuple<String>(j).extract(current));
+                       }
+               }
+       }
+
+       private static final Class<?>[] CLASSES = new Class<?>[] { 
Tuple1.class, Tuple2.class,
+                       Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, 
Tuple7.class, Tuple8.class,
+                       Tuple9.class, Tuple10.class, Tuple11.class, 
Tuple12.class, Tuple13.class,
+                       Tuple14.class, Tuple15.class, Tuple16.class, 
Tuple17.class, Tuple18.class,
+                       Tuple19.class, Tuple20.class, Tuple21.class, 
Tuple22.class, Tuple23.class,
+                       Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
new file mode 100644
index 0000000..3139aa5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
+import org.junit.Test;
+
+public class FieldsFromArrayTest {
+
+       String[] testStringArray = { "0", "1", "2", "3", "4" };
+       Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+       int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+       @Test
+       public void testStringArray() {
+               // check single field extraction
+               for (int i = 0; i < testStringArray.length; i++) {
+                       String[] tmp = { testStringArray[i] };
+                       arrayEqualityCheck(tmp,
+                                       new 
FieldsFromArray<String>(String.class, i).extract(testStringArray));
+               }
+
+               // check reverse order
+               String[] reverseOrder = new String[testStringArray.length];
+               for (int i = 0; i < testStringArray.length; i++) {
+                       reverseOrder[i] = 
testStringArray[testStringArray.length - i - 1];
+               }
+               arrayEqualityCheck(reverseOrder,
+                               new FieldsFromArray<String>(String.class, 4, 3, 
2, 1, 0).extract(testStringArray));
+
+               // check picking fields and reorder
+               String[] crazyOrder = { testStringArray[4], testStringArray[1], 
testStringArray[2] };
+               arrayEqualityCheck(crazyOrder,
+                               new FieldsFromArray<String>(String.class, 4, 1, 
2).extract(testStringArray));
+       }
+
+       @Test
+       public void testIntegerArray() {
+               // check single field extraction
+               for (int i = 0; i < testIntegerArray.length; i++) {
+                       Integer[] tmp = { testIntegerArray[i] };
+                       arrayEqualityCheck(tmp,
+                                       new 
FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
+               }
+
+               // check reverse order
+               Integer[] reverseOrder = new Integer[testIntegerArray.length];
+               for (int i = 0; i < testIntegerArray.length; i++) {
+                       reverseOrder[i] = 
testIntegerArray[testIntegerArray.length - i - 1];
+               }
+               arrayEqualityCheck(reverseOrder,
+                               new FieldsFromArray<Integer>(Integer.class, 4, 
3, 2, 1, 0)
+                                               .extract(testIntegerArray));
+
+               // check picking fields and reorder
+               Integer[] crazyOrder = { testIntegerArray[4], 
testIntegerArray[1], testIntegerArray[2] };
+               arrayEqualityCheck(crazyOrder,
+                               new FieldsFromArray<Integer>(Integer.class, 4, 
1, 2).extract(testIntegerArray));
+
+       }
+
+       @Test
+       public void testIntArray() {
+               for (int i = 0; i < testIntArray.length; i++) {
+                       Integer[] tmp = { testIntArray[i] };
+                       arrayEqualityCheck(tmp,
+                                       new 
FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
+               }
+
+               // check reverse order
+               Integer[] reverseOrder = new Integer[testIntArray.length];
+               for (int i = 0; i < testIntArray.length; i++) {
+                       reverseOrder[i] = testIntArray[testIntArray.length - i 
- 1];
+               }
+               arrayEqualityCheck(reverseOrder,
+                               new FieldsFromArray<Integer>(Integer.class, 4, 
3, 2, 1, 0).extract(testIntArray));
+
+               // check picking fields and reorder
+               Integer[] crazyOrder = { testIntArray[4], testIntArray[1], 
testIntArray[2] };
+               arrayEqualityCheck(crazyOrder,
+                               new FieldsFromArray<Integer>(Integer.class, 4, 
1, 2).extract(testIntArray));
+
+       }
+
+       private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+               assertEquals("The result arrays must have the same length", 
array1.length, array2.length);
+               for (int i = 0; i < array1.length; i++) {
+                       assertEquals("Unequal fields at position " + i, 
array1[i], array2[i]);
+               }
+       }
+}

Reply via email to