http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
deleted file mode 100644
index ec8cda8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowIntegrationTest implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-       private static final Integer MEMORYSIZE = 32;
-
-       @SuppressWarnings("serial")
-       public static class ModKey implements KeySelector<Integer, Integer> {
-               private int m;
-
-               public ModKey(int m) {
-                       this.m = m;
-               }
-
-               @Override
-               public Integer getKey(Integer value) throws Exception {
-                       return value % m;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static class IdentityWindowMap implements
-                       WindowMapFunction<Integer, StreamWindow<Integer>> {
-
-               @Override
-               public void mapWindow(Iterable<Integer> values, 
Collector<StreamWindow<Integer>> out)
-                               throws Exception {
-
-                       StreamWindow<Integer> window = new 
StreamWindow<Integer>();
-
-                       for (Integer value : values) {
-                               window.add(value);
-                       }
-                       out.collect(window);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       @Test
-       public void test() throws Exception {
-
-               List<Integer> inputs = new ArrayList<Integer>();
-               inputs.add(1);
-               inputs.add(2);
-               inputs.add(2);
-               inputs.add(3);
-               inputs.add(4);
-               inputs.add(5);
-               inputs.add(10);
-               inputs.add(11);
-               inputs.add(11);
-
-               KeySelector<Integer, ?> key = new ModKey(2);
-
-               Timestamp<Integer> ts = new Timestamp<Integer>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public long getTimestamp(Integer value) {
-                               return value;
-                       }
-               };
-
-               StreamExecutionEnvironment env = new TestStreamEnvironment(2, 
MEMORYSIZE);
-               env.disableOperatorChaining();
-
-               DataStream<Integer> source = env.fromCollection(inputs);
-
-               source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 
1)).sum(0).getDiscretizedStream()
-                               .addSink(new TestSink1());
-
-               source.window(Time.of(4, ts, 1)).groupBy(new 
ModKey(2)).mapWindow(new IdentityWindowMap())
-                               .flatten().addSink(new TestSink2());
-
-               source.groupBy(key).window(Time.of(4, ts, 
1)).sum(0).getDiscretizedStream()
-                               .addSink(new TestSink4());
-
-               source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new 
ModKey(2))
-                               .mapWindow(new 
IdentityWindowMap()).flatten().addSink(new TestSink5());
-
-               source.window(Time.of(2, ts)).every(Time.of(3, 
ts)).min(0).getDiscretizedStream()
-                               .addSink(new TestSink3());
-
-               source.groupBy(key).window(Time.of(4, ts, 
1)).max(0).getDiscretizedStream()
-                               .addSink(new TestSink6());
-
-               source.window(Time.of(5, ts, 1)).mapWindow(new 
IdentityWindowMap()).flatten()
-                               .addSink(new TestSink7());
-
-               source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 
1)).groupBy(new ModKey(2)).sum(0)
-                               .getDiscretizedStream().addSink(new 
TestSink8());
-
-               try {
-                       
source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
-                       fail();
-               } catch (Exception e) {
-               }
-               try {
-                       
source.window(FullStream.window()).getDiscretizedStream();
-                       fail();
-               } catch (Exception e) {
-               }
-               try {
-                       source.every(Count.of(5)).mapWindow(new 
IdentityWindowMap()).getDiscretizedStream();
-                       fail();
-               } catch (Exception e) {
-               }
-
-               
source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new 
TestSink11());
-
-               
source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
-                               .getDiscretizedStream().addSink(new 
TestSink12());
-
-               DataStream<Integer> source2 = env.addSource(new 
ParallelSourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               for (int i = 1; i <= 10; i++) {
-                                       ctx.collect(i);
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               });
-
-               DataStream<Integer> source3 = env.addSource(new 
RichParallelSourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       private int i = 1;
-
-                       @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               i = 1 + 
getRuntimeContext().getIndexOfThisSubtask();
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               for (;i < 11; i += 2) {
-                                       ctx.collect(i);
-                               }
-
-                       }
-               });
-
-               source2.window(Time.of(2, ts, 
1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
-
-               source3.window(Time.of(5, ts, 1)).groupBy(new 
ModKey(2)).sum(0).getDiscretizedStream()
-                               .addSink(new TestSink10());
-
-               source.map(new MapFunction<Integer, Integer>() {
-                       @Override
-                       public Integer map(Integer value) throws Exception {
-                               return value;
-                       }
-               }).every(Time.of(5, ts, 
1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
-
-               env.execute();
-
-               // sum ( Time of 3 slide 2 )
-               List<StreamWindow<Integer>> expected1 = new 
ArrayList<StreamWindow<Integer>>();
-               expected1.add(StreamWindow.fromElements(5));
-               expected1.add(StreamWindow.fromElements(11));
-               expected1.add(StreamWindow.fromElements(9));
-               expected1.add(StreamWindow.fromElements(10));
-               expected1.add(StreamWindow.fromElements(32));
-
-               validateOutput(expected1, TestSink1.windows);
-
-               // Tumbling Time of 4 grouped by mod 2
-               List<StreamWindow<Integer>> expected2 = new 
ArrayList<StreamWindow<Integer>>();
-               expected2.add(StreamWindow.fromElements(2, 2, 4));
-               expected2.add(StreamWindow.fromElements(1, 3));
-               expected2.add(StreamWindow.fromElements(5));
-               expected2.add(StreamWindow.fromElements(10));
-               expected2.add(StreamWindow.fromElements(11, 11));
-
-               validateOutput(expected2, TestSink2.windows);
-
-               // groupby mod 2 sum ( Tumbling Time of 4)
-               List<StreamWindow<Integer>> expected3 = new 
ArrayList<StreamWindow<Integer>>();
-               expected3.add(StreamWindow.fromElements(4));
-               expected3.add(StreamWindow.fromElements(5));
-               expected3.add(StreamWindow.fromElements(22));
-               expected3.add(StreamWindow.fromElements(8));
-               expected3.add(StreamWindow.fromElements(10));
-
-               validateOutput(expected3, TestSink4.windows);
-
-               // groupby mod3 Tumbling Count of 2 grouped by mod 2
-               List<StreamWindow<Integer>> expected4 = new 
ArrayList<StreamWindow<Integer>>();
-               expected4.add(StreamWindow.fromElements(2, 2));
-               expected4.add(StreamWindow.fromElements(1));
-               expected4.add(StreamWindow.fromElements(4));
-               expected4.add(StreamWindow.fromElements(5, 11));
-               expected4.add(StreamWindow.fromElements(10));
-               expected4.add(StreamWindow.fromElements(11));
-               expected4.add(StreamWindow.fromElements(3));
-
-               validateOutput(expected4, TestSink5.windows);
-
-               // min ( Time of 2 slide 3 )
-               List<StreamWindow<Integer>> expected5 = new 
ArrayList<StreamWindow<Integer>>();
-               expected5.add(StreamWindow.fromElements(1));
-               expected5.add(StreamWindow.fromElements(4));
-               expected5.add(StreamWindow.fromElements(10));
-
-               validateOutput(expected5, TestSink3.windows);
-
-               // groupby mod 2 max ( Tumbling Time of 4)
-               List<StreamWindow<Integer>> expected6 = new 
ArrayList<StreamWindow<Integer>>();
-               expected6.add(StreamWindow.fromElements(3));
-               expected6.add(StreamWindow.fromElements(5));
-               expected6.add(StreamWindow.fromElements(11));
-               expected6.add(StreamWindow.fromElements(4));
-               expected6.add(StreamWindow.fromElements(10));
-
-               validateOutput(expected6, TestSink6.windows);
-
-               List<StreamWindow<Integer>> expected7 = new 
ArrayList<StreamWindow<Integer>>();
-               expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-               expected7.add(StreamWindow.fromElements(10));
-               expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-               validateOutput(expected7, TestSink7.windows);
-
-               List<StreamWindow<Integer>> expected8 = new 
ArrayList<StreamWindow<Integer>>();
-               expected8.add(StreamWindow.fromElements(4, 8));
-               expected8.add(StreamWindow.fromElements(4, 5));
-               expected8.add(StreamWindow.fromElements(10, 22));
-
-               for (List<Integer> sw : TestSink8.windows) {
-                       Collections.sort(sw);
-               }
-
-               validateOutput(expected8, TestSink8.windows);
-
-               List<StreamWindow<Integer>> expected9 = new 
ArrayList<StreamWindow<Integer>>();
-               expected9.add(StreamWindow.fromElements(6));
-               expected9.add(StreamWindow.fromElements(14));
-               expected9.add(StreamWindow.fromElements(22));
-               expected9.add(StreamWindow.fromElements(30));
-               expected9.add(StreamWindow.fromElements(38));
-
-               validateOutput(expected9, TestSink9.windows);
-
-               List<StreamWindow<Integer>> expected10 = new 
ArrayList<StreamWindow<Integer>>();
-               expected10.add(StreamWindow.fromElements(6, 9));
-               expected10.add(StreamWindow.fromElements(16, 24));
-
-               for (List<Integer> sw : TestSink10.windows) {
-                       Collections.sort(sw);
-               }
-
-               validateOutput(expected10, TestSink10.windows);
-
-               List<StreamWindow<Integer>> expected11 = new 
ArrayList<StreamWindow<Integer>>();
-               expected11.add(StreamWindow.fromElements(8));
-               expected11.add(StreamWindow.fromElements(38));
-               expected11.add(StreamWindow.fromElements(49));
-
-               for (List<Integer> sw : TestSink11.windows) {
-                       Collections.sort(sw);
-               }
-
-               validateOutput(expected11, TestSink11.windows);
-
-               List<StreamWindow<Integer>> expected12 = new 
ArrayList<StreamWindow<Integer>>();
-               expected12.add(StreamWindow.fromElements(4, 4));
-               expected12.add(StreamWindow.fromElements(18, 20));
-               expected12.add(StreamWindow.fromElements(18, 31));
-
-               for (List<Integer> sw : TestSink12.windows) {
-                       Collections.sort(sw);
-               }
-
-               validateOutput(expected12, TestSink12.windows);
-
-               List<StreamWindow<Integer>> expected13 = new 
ArrayList<StreamWindow<Integer>>();
-               expected13.add(StreamWindow.fromElements(17));
-               expected13.add(StreamWindow.fromElements(27));
-               expected13.add(StreamWindow.fromElements(49));
-
-               for (List<Integer> sw : TestSink13.windows) {
-                       Collections.sort(sw);
-               }
-
-               validateOutput(expected13, TestSink13.windows);
-
-       }
-
-       public static <R> void validateOutput(List<R> expected, List<R> actual) 
{
-               assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink1 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink2 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink3 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink4 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink5 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink6 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink7 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink8 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink9 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink10 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink11 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink12 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-       @SuppressWarnings("serial")
-       private static class TestSink13 implements 
SinkFunction<StreamWindow<Integer>> {
-
-               public static List<StreamWindow<Integer>> windows = Collections
-                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
-
-               @Override
-               public void invoke(StreamWindow<Integer> value) throws 
Exception {
-                       windows.add(value);
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
new file mode 100644
index 0000000..5e6ffa2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.FullStream;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class WindowingITCase implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+       private static final Integer MEMORYSIZE = 32;
+
+       @SuppressWarnings("serial")
+       public static class ModKey implements KeySelector<Integer, Integer> {
+               private int m;
+
+               public ModKey(int m) {
+                       this.m = m;
+               }
+
+               @Override
+               public Integer getKey(Integer value) throws Exception {
+                       return value % m;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       public static class IdentityWindowMap implements
+                       WindowMapFunction<Integer, StreamWindow<Integer>> {
+
+               @Override
+               public void mapWindow(Iterable<Integer> values, 
Collector<StreamWindow<Integer>> out)
+                               throws Exception {
+
+                       StreamWindow<Integer> window = new 
StreamWindow<Integer>();
+
+                       for (Integer value : values) {
+                               window.add(value);
+                       }
+                       out.collect(window);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       @Test
+       public void test() throws Exception {
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(2);
+               inputs.add(2);
+               inputs.add(3);
+               inputs.add(4);
+               inputs.add(5);
+               inputs.add(10);
+               inputs.add(11);
+               inputs.add(11);
+
+               KeySelector<Integer, ?> key = new ModKey(2);
+
+               Timestamp<Integer> ts = new Timestamp<Integer>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public long getTimestamp(Integer value) {
+                               return value;
+                       }
+               };
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(2, 
MEMORYSIZE);
+               env.disableOperatorChaining();
+
+               DataStream<Integer> source = env.fromCollection(inputs);
+
+               source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 
1)).sum(0).getDiscretizedStream()
+                               .addSink(new TestSink1());
+
+               source.window(Time.of(4, ts, 1)).groupBy(new 
ModKey(2)).mapWindow(new IdentityWindowMap())
+                               .flatten().addSink(new TestSink2());
+
+               source.groupBy(key).window(Time.of(4, ts, 
1)).sum(0).getDiscretizedStream()
+                               .addSink(new TestSink4());
+
+               source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new 
ModKey(2))
+                               .mapWindow(new 
IdentityWindowMap()).flatten().addSink(new TestSink5());
+
+               source.window(Time.of(2, ts)).every(Time.of(3, 
ts)).min(0).getDiscretizedStream()
+                               .addSink(new TestSink3());
+
+               source.groupBy(key).window(Time.of(4, ts, 
1)).max(0).getDiscretizedStream()
+                               .addSink(new TestSink6());
+
+               source.window(Time.of(5, ts, 1)).mapWindow(new 
IdentityWindowMap()).flatten()
+                               .addSink(new TestSink7());
+
+               source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 
1)).groupBy(new ModKey(2)).sum(0)
+                               .getDiscretizedStream().addSink(new 
TestSink8());
+
+               try {
+                       
source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
+                       fail();
+               } catch (Exception e) {
+               }
+               try {
+                       
source.window(FullStream.window()).getDiscretizedStream();
+                       fail();
+               } catch (Exception e) {
+               }
+               try {
+                       source.every(Count.of(5)).mapWindow(new 
IdentityWindowMap()).getDiscretizedStream();
+                       fail();
+               } catch (Exception e) {
+               }
+
+               
source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new 
TestSink11());
+
+               
source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+                               .getDiscretizedStream().addSink(new 
TestSink12());
+
+               DataStream<Integer> source2 = env.addSource(new 
ParallelSourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                               for (int i = 1; i <= 10; i++) {
+                                       ctx.collect(i);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               });
+
+               DataStream<Integer> source3 = env.addSource(new 
RichParallelSourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       private int i = 1;
+
+                       @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               super.open(parameters);
+                               i = 1 + 
getRuntimeContext().getIndexOfThisSubtask();
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                               for (;i < 11; i += 2) {
+                                       ctx.collect(i);
+                               }
+
+                       }
+               });
+
+               source2.window(Time.of(2, ts, 
1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
+
+               source3.window(Time.of(5, ts, 1)).groupBy(new 
ModKey(2)).sum(0).getDiscretizedStream()
+                               .addSink(new TestSink10());
+
+               source.map(new MapFunction<Integer, Integer>() {
+                       @Override
+                       public Integer map(Integer value) throws Exception {
+                               return value;
+                       }
+               }).every(Time.of(5, ts, 
1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
+
+               env.execute();
+
+               // sum ( Time of 3 slide 2 )
+               List<StreamWindow<Integer>> expected1 = new 
ArrayList<StreamWindow<Integer>>();
+               expected1.add(StreamWindow.fromElements(5));
+               expected1.add(StreamWindow.fromElements(11));
+               expected1.add(StreamWindow.fromElements(9));
+               expected1.add(StreamWindow.fromElements(10));
+               expected1.add(StreamWindow.fromElements(32));
+
+               validateOutput(expected1, TestSink1.windows);
+
+               // Tumbling Time of 4 grouped by mod 2
+               List<StreamWindow<Integer>> expected2 = new 
ArrayList<StreamWindow<Integer>>();
+               expected2.add(StreamWindow.fromElements(2, 2, 4));
+               expected2.add(StreamWindow.fromElements(1, 3));
+               expected2.add(StreamWindow.fromElements(5));
+               expected2.add(StreamWindow.fromElements(10));
+               expected2.add(StreamWindow.fromElements(11, 11));
+
+               validateOutput(expected2, TestSink2.windows);
+
+               // groupby mod 2 sum ( Tumbling Time of 4)
+               List<StreamWindow<Integer>> expected3 = new 
ArrayList<StreamWindow<Integer>>();
+               expected3.add(StreamWindow.fromElements(4));
+               expected3.add(StreamWindow.fromElements(5));
+               expected3.add(StreamWindow.fromElements(22));
+               expected3.add(StreamWindow.fromElements(8));
+               expected3.add(StreamWindow.fromElements(10));
+
+               validateOutput(expected3, TestSink4.windows);
+
+               // groupby mod3 Tumbling Count of 2 grouped by mod 2
+               List<StreamWindow<Integer>> expected4 = new 
ArrayList<StreamWindow<Integer>>();
+               expected4.add(StreamWindow.fromElements(2, 2));
+               expected4.add(StreamWindow.fromElements(1));
+               expected4.add(StreamWindow.fromElements(4));
+               expected4.add(StreamWindow.fromElements(5, 11));
+               expected4.add(StreamWindow.fromElements(10));
+               expected4.add(StreamWindow.fromElements(11));
+               expected4.add(StreamWindow.fromElements(3));
+
+               validateOutput(expected4, TestSink5.windows);
+
+               // min ( Time of 2 slide 3 )
+               List<StreamWindow<Integer>> expected5 = new 
ArrayList<StreamWindow<Integer>>();
+               expected5.add(StreamWindow.fromElements(1));
+               expected5.add(StreamWindow.fromElements(4));
+               expected5.add(StreamWindow.fromElements(10));
+
+               validateOutput(expected5, TestSink3.windows);
+
+               // groupby mod 2 max ( Tumbling Time of 4)
+               List<StreamWindow<Integer>> expected6 = new 
ArrayList<StreamWindow<Integer>>();
+               expected6.add(StreamWindow.fromElements(3));
+               expected6.add(StreamWindow.fromElements(5));
+               expected6.add(StreamWindow.fromElements(11));
+               expected6.add(StreamWindow.fromElements(4));
+               expected6.add(StreamWindow.fromElements(10));
+
+               validateOutput(expected6, TestSink6.windows);
+
+               List<StreamWindow<Integer>> expected7 = new 
ArrayList<StreamWindow<Integer>>();
+               expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+               expected7.add(StreamWindow.fromElements(10));
+               expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+               validateOutput(expected7, TestSink7.windows);
+
+               List<StreamWindow<Integer>> expected8 = new 
ArrayList<StreamWindow<Integer>>();
+               expected8.add(StreamWindow.fromElements(4, 8));
+               expected8.add(StreamWindow.fromElements(4, 5));
+               expected8.add(StreamWindow.fromElements(10, 22));
+
+               for (List<Integer> sw : TestSink8.windows) {
+                       Collections.sort(sw);
+               }
+
+               validateOutput(expected8, TestSink8.windows);
+
+               List<StreamWindow<Integer>> expected9 = new 
ArrayList<StreamWindow<Integer>>();
+               expected9.add(StreamWindow.fromElements(6));
+               expected9.add(StreamWindow.fromElements(14));
+               expected9.add(StreamWindow.fromElements(22));
+               expected9.add(StreamWindow.fromElements(30));
+               expected9.add(StreamWindow.fromElements(38));
+
+               validateOutput(expected9, TestSink9.windows);
+
+               List<StreamWindow<Integer>> expected10 = new 
ArrayList<StreamWindow<Integer>>();
+               expected10.add(StreamWindow.fromElements(6, 9));
+               expected10.add(StreamWindow.fromElements(16, 24));
+
+               for (List<Integer> sw : TestSink10.windows) {
+                       Collections.sort(sw);
+               }
+
+               validateOutput(expected10, TestSink10.windows);
+
+               List<StreamWindow<Integer>> expected11 = new 
ArrayList<StreamWindow<Integer>>();
+               expected11.add(StreamWindow.fromElements(8));
+               expected11.add(StreamWindow.fromElements(38));
+               expected11.add(StreamWindow.fromElements(49));
+
+               for (List<Integer> sw : TestSink11.windows) {
+                       Collections.sort(sw);
+               }
+
+               validateOutput(expected11, TestSink11.windows);
+
+               List<StreamWindow<Integer>> expected12 = new 
ArrayList<StreamWindow<Integer>>();
+               expected12.add(StreamWindow.fromElements(4, 4));
+               expected12.add(StreamWindow.fromElements(18, 20));
+               expected12.add(StreamWindow.fromElements(18, 31));
+
+               for (List<Integer> sw : TestSink12.windows) {
+                       Collections.sort(sw);
+               }
+
+               validateOutput(expected12, TestSink12.windows);
+
+               List<StreamWindow<Integer>> expected13 = new 
ArrayList<StreamWindow<Integer>>();
+               expected13.add(StreamWindow.fromElements(17));
+               expected13.add(StreamWindow.fromElements(27));
+               expected13.add(StreamWindow.fromElements(49));
+
+               for (List<Integer> sw : TestSink13.windows) {
+                       Collections.sort(sw);
+               }
+
+               validateOutput(expected13, TestSink13.windows);
+
+       }
+
+       public static <R> void validateOutput(List<R> expected, List<R> actual) 
{
+               assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink1 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink2 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink3 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink4 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink5 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink6 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink7 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink8 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink9 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink10 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink11 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink12 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class TestSink13 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index eb49e26..6e22021 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -48,6 +48,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.InstantiationUtil;
@@ -103,12 +105,13 @@ public class StatefulOperatorTest {
        @Test
        public void apiTest() throws Exception {
                StreamExecutionEnvironment env = new TestStreamEnvironment(3, 
32);
-               
+
                KeyedDataStream<Integer> keyedStream = 
env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
                
                keyedStream.map(new StatefulMapper()).addSink(new 
SinkFunction<String>() {
                        private static final long serialVersionUID = 1L;
-                       public void invoke(String value) throws Exception {}
+                       public void invoke(String value) throws Exception {
+                       }
                });
                
                keyedStream.map(new 
StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
@@ -128,8 +131,8 @@ public class StatefulOperatorTest {
 
        private void processInputs(StreamMap<Integer, ?> map, List<Integer> 
input) throws Exception {
                for (Integer i : input) {
-                       map.getRuntimeContext().setNextInput(i);
-                       map.processElement(i);
+                       map.getRuntimeContext().setNextInput(new 
StreamRecord<Integer>(i, 0L));
+                       map.processElement(new StreamRecord<Integer>(i, 0L));
                }
        }
 
@@ -144,11 +147,16 @@ public class StatefulOperatorTest {
 
                StreamMap<Integer, String> op = new StreamMap<Integer, 
String>(new StatefulMapper());
 
-               op.setup(new Output<String>() {
+               op.setup(new Output<StreamRecord<String>>() {
 
                        @Override
-                       public void collect(String record) {
-                               outputList.add(record);
+                       public void collect(StreamRecord<String> record) {
+                               outputList.add(record.getValue());
+                       }
+
+                       @Override
+                       public void emitWatermark(Watermark mark) {
+
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
index 4ac7fda..317a21c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
@@ -40,6 +40,6 @@ public class MockRecordWriter extends 
RecordWriter<SerializationDelegate<StreamR
        
        @Override
        public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> 
record) {
-               emittedRecords.add(record.getInstance().getObject().f0);
+               emittedRecords.add(record.getInstance().getValue().f0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index 967c719..6bc0e30 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Test;
 
 public class BasicWindowBufferTest {
@@ -33,7 +33,7 @@ public class BasicWindowBufferTest {
        @Test
        public void testEmitWindow() throws Exception {
 
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
                List<StreamWindow<Integer>> collected = 
collector.getCollected();
 
                WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>();
@@ -60,13 +60,13 @@ public class BasicWindowBufferTest {
                assertEquals(2, collected.size());
        }
 
-       public static class TestCollector<T> implements Collector<T> {
+       public static class TestOutput<T> implements Output<StreamRecord<T>> {
 
                private final List<T> collected = new ArrayList<T>();
 
                @Override
-               public void collect(T record) {
-                       collected.add(record);
+               public void collect(StreamRecord<T> record) {
+                       collected.add(record.getValue());
                }
 
                @Override
@@ -77,6 +77,10 @@ public class BasicWindowBufferTest {
                        return collected;
                }
 
+               @Override
+               public void emitWatermark(Watermark mark) {
+
+               }
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
index c91910b..8430499 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
@@ -32,8 +32,9 @@ import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
+
 import org.junit.Test;
 
 public class JumpingCountGroupedPreReducerTest {
@@ -58,7 +59,7 @@ public class JumpingCountGroupedPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(1, -2));
                inputs.add(new Tuple2<Integer, Integer>(100, -200));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -109,7 +110,7 @@ public class JumpingCountGroupedPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(1, -2));
                inputs.add(new Tuple2<Integer, Integer>(100, -200));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
index ba890ab..2279264 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class JumpingCountPreReducerTest {
@@ -48,7 +48,7 @@ public class JumpingCountPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(4, -2));
                inputs.add(new Tuple2<Integer, Integer>(5, -3));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
JumpingCountPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
index 5b693e7..ce312d3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class JumpingTimePreReducerTest {
@@ -39,7 +39,7 @@ public class JumpingTimePreReducerTest {
        @Test
        public void testEmitWindow() throws Exception {
 
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
                List<StreamWindow<Integer>> collected = 
collector.getCollected();
 
                WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
index 377bdb5..7f58527 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -1,34 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.flink.streaming.api.windowing.windowbuffer;
 
-import static 
org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import 
org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingCountGroupedPreReducerTest {
@@ -37,11 +38,11 @@ public class SlidingCountGroupedPreReducerTest {
 
        ReduceFunction<Integer> reducer = new SumReducer();
 
-       KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+       KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
 
        @Test
        public void testPreReduce1() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountGroupedPreReducer<Integer> preReducer = new 
SlidingCountGroupedPreReducer<Integer>(
                                reducer, serializer, key, 3, 2, 0);
@@ -84,7 +85,7 @@ public class SlidingCountGroupedPreReducerTest {
 
        @Test
        public void testPreReduce2() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountGroupedPreReducer<Integer> preReducer = new 
SlidingCountGroupedPreReducer<Integer>(
                                reducer, serializer, key, 5, 2, 0);
@@ -126,7 +127,7 @@ public class SlidingCountGroupedPreReducerTest {
 
        @Test
        public void testPreReduce3() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountGroupedPreReducer<Integer> preReducer = new 
SlidingCountGroupedPreReducer<Integer>(
                                reducer, serializer, key, 6, 3, 0);
@@ -163,7 +164,7 @@ public class SlidingCountGroupedPreReducerTest {
 
        @Test
        public void testPreReduce4() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountGroupedPreReducer<Integer> preReducer = new 
SlidingCountGroupedPreReducer<Integer>(
                                reducer, serializer, key, 5, 1, 2);
@@ -217,4 +218,18 @@ public class SlidingCountGroupedPreReducerTest {
 
        }
 
+
+       protected static void checkResults(List<StreamWindow<Integer>> expected,
+                       List<StreamWindow<Integer>> actual) {
+
+               for (StreamWindow<Integer> sw : expected) {
+                       Collections.sort(sw);
+               }
+
+               for (StreamWindow<Integer> sw : actual) {
+                       Collections.sort(sw);
+               }
+
+               assertEquals(expected, actual);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
index 3ce65f1..156b875 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingCountPreReducerTest {
@@ -37,7 +37,7 @@ public class SlidingCountPreReducerTest {
 
        @Test
        public void testPreReduce1() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountPreReducer<Integer> preReducer = new 
SlidingCountPreReducer<Integer>(reducer,
                                serializer, 3, 2, 0);
@@ -80,7 +80,7 @@ public class SlidingCountPreReducerTest {
 
        @Test
        public void testPreReduce2() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountPreReducer<Integer> preReducer = new 
SlidingCountPreReducer<Integer>(reducer,
                                serializer, 5, 2, 0);
@@ -122,7 +122,7 @@ public class SlidingCountPreReducerTest {
 
        @Test
        public void testPreReduce3() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountPreReducer<Integer> preReducer = new 
SlidingCountPreReducer<Integer>(reducer,
                                serializer, 6, 3, 0);
@@ -159,7 +159,7 @@ public class SlidingCountPreReducerTest {
 
        @Test
        public void testPreReduce4() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingCountPreReducer<Integer> preReducer = new 
SlidingCountPreReducer<Integer>(reducer,
                                serializer, 5, 1, 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3f1cba1..68bceda 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import 
org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingTimeGroupedPreReducerTest {
@@ -48,7 +48,7 @@ public class SlidingTimeGroupedPreReducerTest {
        ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new 
TupleSumReducer();
 
 
-       KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+       KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
        KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
 
        @Test
@@ -58,7 +58,7 @@ public class SlidingTimeGroupedPreReducerTest {
                // replaying the same sequence of elements with a later 
timestamp and expecting the same
                // result.
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 
                SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> 
preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, 
Integer>>(tupleReducer,
                                tupleType.createSerializer(new 
ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, 
Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
@@ -190,7 +190,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
        @Test
        public void testPreReduce2() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimeGroupedPreReducer<Integer> preReducer = new 
SlidingTimeGroupedPreReducer<Integer>(
                                reducer, serializer, key, 5, 2, new 
TimestampWrapper<Integer>(
@@ -241,7 +241,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
        @Test
        public void testPreReduce3() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimeGroupedPreReducer<Integer> preReducer = new 
SlidingTimeGroupedPreReducer<Integer>(
                                reducer, serializer, key, 6, 3, new 
TimestampWrapper<Integer>(
@@ -287,7 +287,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
        @Test
        public void testPreReduce4() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimeGroupedPreReducer<Integer> preReducer = new 
SlidingTimeGroupedPreReducer<Integer>(
                                reducer, serializer, key, 3, 2, new 
TimestampWrapper<Integer>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index 0519da7..6a36c57 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingTimePreReducerTest {
@@ -50,7 +50,7 @@ public class SlidingTimePreReducerTest {
                // replaying the same sequence of elements with a later 
timestamp and expecting the same
                // result.
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 
                SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = 
new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
                                tupleType.createSerializer(new 
ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new 
Timestamp<Tuple2<Integer, Integer>>() {
@@ -145,7 +145,7 @@ public class SlidingTimePreReducerTest {
 
        @Test
        public void testPreReduce2() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimePreReducer<Integer> preReducer = new 
SlidingTimePreReducer<Integer>(reducer,
                                serializer, 5, 2, new 
TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -195,7 +195,7 @@ public class SlidingTimePreReducerTest {
 
        @Test
        public void testPreReduce3() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimePreReducer<Integer> preReducer = new 
SlidingTimePreReducer<Integer>(reducer,
                                serializer, 6, 3, new 
TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -240,7 +240,7 @@ public class SlidingTimePreReducerTest {
 
        @Test
        public void testPreReduce4() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               TestOutput<StreamWindow<Integer>> collector = new 
TestOutput<StreamWindow<Integer>>();
 
                SlidingTimePreReducer<Integer> preReducer = new 
SlidingTimePreReducer<Integer>(reducer,
                                serializer, 3, 2, new 
TimestampWrapper<Integer>(new Timestamp<Integer>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index c5107bf..3aee288 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
 
@@ -57,7 +57,7 @@ public class TumblingGroupedPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(1, -1));
                inputs.add(new Tuple2<Integer, Integer>(1, -2));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -104,7 +104,7 @@ public class TumblingGroupedPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(1, -1));
                inputs.add(new Tuple2<Integer, Integer>(1, -2));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index b8de02e..3e537a5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class TumblingPreReducerTest {
@@ -49,7 +47,7 @@ public class TumblingPreReducerTest {
                inputs.add(new Tuple2<Integer, Integer>(3, -1));
                inputs.add(new Tuple2<Integer, Integer>(4, -2));
 
-               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+               TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = 
new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
                List<StreamWindow<Tuple2<Integer, Integer>>> collected = 
collector.getCollected();
 
                WindowBuffer<Tuple2<Integer, Integer>> wb = new 
TumblingPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
index 3f8401d..d8a3696 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
 import org.junit.Test;
 
 public class BarrierBufferIOTest {
@@ -55,7 +54,7 @@ public class BarrierBufferIOTest {
                                if (boe.isBuffer()) {
                                        boe.getBuffer().recycle();
                                } else {
-                                       barrierBuffer.processSuperstep(boe);
+                                       barrierBuffer.processBarrier(boe);
                                }
                        }
                        // System.out.println("Ran for " + 
(System.currentTimeMillis() -
@@ -101,14 +100,14 @@ public class BarrierBufferIOTest {
 
                private int numChannels;
                private BufferPool[] bufferPools;
-               private int[] currentSupersteps;
+               private int[] currentBarriers;
                BarrierGenerator[] barrierGens;
                int currentChannel = 0;
                long c = 0;
 
                public MockInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens) {
                        this.numChannels = bufferPools.length;
-                       this.currentSupersteps = new int[numChannels];
+                       this.currentBarriers = new int[numChannels];
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
                }
@@ -132,7 +131,7 @@ public class BarrierBufferIOTest {
                        currentChannel = (currentChannel + 1) % numChannels;
 
                        if (barrierGens[currentChannel].isNextBarrier()) {
-                               return 
BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+                               return 
BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
                                                currentChannel);
                        } else {
                                Buffer buffer = 
bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 89ec7dc..cb5e046 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 
 import org.junit.Test;
 
@@ -67,10 +67,10 @@ public class BarrierBufferTest {
                List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
                input.add(createBuffer(0));
                input.add(createBuffer(0));
-               input.add(createSuperstep(1, 0));
+               input.add(createBarrier(1, 0));
                input.add(createBuffer(0));
                input.add(createBuffer(0));
-               input.add(createSuperstep(2, 0));
+               input.add(createBarrier(2, 0));
                input.add(createBuffer(0));
 
                InputGate mockIG = new MockInputGate(1, input);
@@ -82,11 +82,11 @@ public class BarrierBufferTest {
                assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
                assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
                assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
                assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
                assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
 
                bb.cleanup();
@@ -98,18 +98,18 @@ public class BarrierBufferTest {
                List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
                input.add(createBuffer(0));
                input.add(createBuffer(1));
-               input.add(createSuperstep(1, 0));
-               input.add(createSuperstep(2, 0));
+               input.add(createBarrier(1, 0));
+               input.add(createBarrier(2, 0));
                input.add(createBuffer(0));
-               input.add(createSuperstep(3, 0));
+               input.add(createBarrier(3, 0));
                input.add(createBuffer(0));
                input.add(createBuffer(1));
-               input.add(createSuperstep(1, 1));
+               input.add(createBarrier(1, 1));
                input.add(createBuffer(0));
                input.add(createBuffer(1));
-               input.add(createSuperstep(2, 1));
-               input.add(createSuperstep(3, 1));
-               input.add(createSuperstep(4, 0));
+               input.add(createBarrier(2, 1));
+               input.add(createBarrier(3, 1));
+               input.add(createBarrier(4, 0));
                input.add(createBuffer(0));
                input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
                
@@ -123,24 +123,24 @@ public class BarrierBufferTest {
                check(input.get(0), nextBoe = bb.getNextNonBlocked());
                check(input.get(1), nextBoe = bb.getNextNonBlocked());
                check(input.get(2), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(7), nextBoe = bb.getNextNonBlocked());
                check(input.get(8), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(3), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(10), nextBoe = bb.getNextNonBlocked());
                check(input.get(11), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(4), nextBoe = bb.getNextNonBlocked());
                check(input.get(5), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(12), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(6), nextBoe = bb.getNextNonBlocked());
                check(input.get(9), nextBoe = bb.getNextNonBlocked());
                check(input.get(13), nextBoe = bb.getNextNonBlocked());
-               bb.processSuperstep(nextBoe);
+               bb.processBarrier(nextBoe);
                check(input.get(14), nextBoe = bb.getNextNonBlocked());
                check(input.get(15), nextBoe = bb.getNextNonBlocked());
 
@@ -206,8 +206,8 @@ public class BarrierBufferTest {
                }
        }
 
-       protected static BufferOrEvent createSuperstep(long id, int channel) {
-               return new BufferOrEvent(new StreamingSuperstep(id, 
System.currentTimeMillis()), channel);
+       protected static BufferOrEvent createBarrier(long id, int channel) {
+               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
        }
 
        protected static BufferOrEvent createBuffer(int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
deleted file mode 100644
index 528829d..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
-import org.apache.flink.streaming.runtime.io.CoRecordReader;
-import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate;
-import org.junit.Test;
-
-public class CoRecordReaderTest {
-
-       @Test
-       public void test() throws InterruptedException, IOException {
-
-               List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
-               input1.add(BarrierBufferTest.createBuffer(0));
-               input1.add(BarrierBufferTest.createSuperstep(1, 0));
-               input1.add(BarrierBufferTest.createBuffer(0));
-
-               InputGate ig1 = new MockInputGate(1, input1);
-
-               List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
-               input2.add(BarrierBufferTest.createBuffer(0));
-               input2.add(BarrierBufferTest.createBuffer(0));
-               input2.add(BarrierBufferTest.createSuperstep(1, 0));
-               input2.add(BarrierBufferTest.createBuffer(0));
-
-               InputGate ig2 = new MockInputGate(1, input2);
-
-               CoRecordReader<?, ?> coReader = new 
CoRecordReader<IOReadableWritable, IOReadableWritable>(
-                               ig1, ig2);
-               BarrierBuffer b1 = coReader.barrierBuffer1;
-               BarrierBuffer b2 = coReader.barrierBuffer2;
-
-               coReader.addToAvailable(ig1);
-               coReader.addToAvailable(ig2);
-               coReader.addToAvailable(ig2);
-               coReader.addToAvailable(ig1);
-
-               assertEquals(1, coReader.getNextReaderIndexBlocking());
-               b1.getNextNonBlocked();
-
-               assertEquals(2, coReader.getNextReaderIndexBlocking());
-               b2.getNextNonBlocked();
-
-               assertEquals(2, coReader.getNextReaderIndexBlocking());
-               b2.getNextNonBlocked();
-
-               assertEquals(1, coReader.getNextReaderIndexBlocking());
-               b1.getNextNonBlocked();
-               b1.processSuperstep(input1.get(1));
-
-               coReader.addToAvailable(ig1);
-               coReader.addToAvailable(ig2);
-               coReader.addToAvailable(ig2);
-
-               assertEquals(2, coReader.getNextReaderIndexBlocking());
-               b2.getNextNonBlocked();
-               b2.processSuperstep(input2.get(2));
-
-               assertEquals(1, coReader.getNextReaderIndexBlocking());
-               b1.getNextNonBlocked();
-
-               assertEquals(2, coReader.getNextReaderIndexBlocking());
-               b2.getNextNonBlocked();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index aa4d24a..a1cea13 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,7 +31,7 @@ public class BroadcastPartitionerTest {
        private BroadcastPartitioner<Tuple> broadcastPartitioner2;
        private BroadcastPartitioner<Tuple> broadcastPartitioner3;
        
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+       private StreamRecord<Tuple> streamRecord = new 
StreamRecord<Tuple>(null);
        private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(null);
 
        @Before

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
index b37e43a..2643bba 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class DistributePartitionerTest {
        
        private RebalancePartitioner<Tuple> distributePartitioner;
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+       private StreamRecord<Tuple> streamRecord = new 
StreamRecord<Tuple>(null);
        private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
                        null);
        

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
index 94d29ac..05541f5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
@@ -21,34 +21,28 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class FieldsPartitionerTest {
 
-       private FieldsPartitioner<Tuple> fieldsPartitioner;
-       private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
-                       .setObject(new Tuple2<String, Integer>("test", 0));
-       private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
-                       .setObject(new Tuple2<String, Integer>("test", 42));
-       private SerializationDelegate<StreamRecord<Tuple>> sd1 = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-       private SerializationDelegate<StreamRecord<Tuple>> sd2 = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
+       private FieldsPartitioner<Tuple2<String, Integer>> fieldsPartitioner;
+       private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new 
StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0));
+       private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new 
StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42));
+       private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> 
sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
+       private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> 
sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
 
        @Before
        public void setPartitioner() {
-               fieldsPartitioner = new FieldsPartitioner<Tuple>(new 
KeySelector<Tuple, String>() {
+               fieldsPartitioner = new FieldsPartitioner<Tuple2<String, 
Integer>>(new KeySelector<Tuple2<String, Integer>, String>() {
 
                        private static final long serialVersionUID = 1L;
 
                        @Override
-                       public String getKey(Tuple value) throws Exception {
+                       public String getKey(Tuple2<String, Integer> value) 
throws Exception {
                                return value.getField(0);
                        }
                });

Reply via email to