http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..1c273d3
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public abstract class AggregationFunction<T> implements ReduceFunction<T> {
+       private static final long serialVersionUID = 1L;
+
+       public int position;
+
+       public AggregationFunction(int pos) {
+               this.position = pos;
+       }
+
+       public static enum AggregationType {
+               SUM, MIN, MAX, MINBY, MAXBY,
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
new file mode 100644
index 0000000..226c45a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       public Comparator comparator;
+       public boolean byAggregate;
+       public boolean first;
+
+       public ComparableAggregator(int pos, AggregationType aggregationType, 
boolean first) {
+               super(pos);
+               this.comparator = Comparator.getForAggregation(aggregationType);
+               this.byAggregate = (aggregationType == AggregationType.MAXBY)
+                               || (aggregationType == AggregationType.MINBY);
+               this.first = first;
+       }
+
+       public static <R> AggregationFunction<R> getAggregator(int 
positionToAggregate,
+                       TypeInformation<R> typeInfo, AggregationType 
aggregationType) {
+               return getAggregator(positionToAggregate, typeInfo, 
aggregationType, false);
+       }
+
+       public static <R> AggregationFunction<R> getAggregator(int 
positionToAggregate,
+                       TypeInformation<R> typeInfo, AggregationType 
aggregationType, boolean first) {
+
+               if (typeInfo.isTupleType()) {
+                       return new 
TupleComparableAggregator<R>(positionToAggregate, aggregationType, first);
+               } else if (typeInfo instanceof BasicArrayTypeInfo
+                               || typeInfo instanceof PrimitiveArrayTypeInfo) {
+                       return new 
ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first);
+               } else {
+                       return new 
SimpleComparableAggregator<R>(aggregationType);
+               }
+       }
+
+       public static <R> AggregationFunction<R> getAggregator(String field,
+                       TypeInformation<R> typeInfo, AggregationType 
aggregationType, boolean first) {
+
+               return new PojoComparableAggregator<R>(field, typeInfo, 
aggregationType, first);
+       }
+
+       private static class TupleComparableAggregator<T> extends 
ComparableAggregator<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               public TupleComparableAggregator(int pos, AggregationType 
aggregationType, boolean first) {
+                       super(pos, aggregationType, first);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+                       Tuple tuple1 = (Tuple) value1;
+                       Tuple tuple2 = (Tuple) value2;
+
+                       Comparable<Object> o1 = tuple1.getField(position);
+                       Object o2 = tuple2.getField(position);
+
+                       int c = comparator.isExtremal(o1, o2);
+
+                       if (byAggregate) {
+                               if (c == 1) {
+                                       return (T) tuple1;
+                               }
+                               if (first) {
+                                       if (c == 0) {
+                                               return (T) tuple1;
+                                       }
+                               }
+
+                               return (T) tuple2;
+
+                       } else {
+                               if (c == 1) {
+                                       tuple2.setField(o1, position);
+                               }
+                               return (T) tuple2;
+                       }
+
+               }
+       }
+
+       private static class ArrayComparableAggregator<T> extends 
ComparableAggregator<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               public ArrayComparableAggregator(int pos, AggregationType 
aggregationType, boolean first) {
+                       super(pos, aggregationType, first);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T reduce(T array1, T array2) throws Exception {
+
+                       Object v1 = Array.get(array1, position);
+                       Object v2 = Array.get(array2, position);
+
+                       int c = comparator.isExtremal((Comparable<Object>) v1, 
v2);
+
+                       if (byAggregate) {
+                               if (c == 1) {
+                                       return array1;
+                               }
+                               if (first) {
+                                       if (c == 0) {
+                                               return array1;
+                                       }
+                               }
+
+                               return array2;
+                       } else {
+                               if (c == 1) {
+                                       Array.set(array2, position, v1);
+                               }
+
+                               return array2;
+                       }
+               }
+
+       }
+
+       private static class SimpleComparableAggregator<T> extends 
ComparableAggregator<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               public SimpleComparableAggregator(AggregationType 
aggregationType) {
+                       super(0, aggregationType, false);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       if (comparator.isExtremal((Comparable<Object>) value1, 
value2) == 1) {
+                               return value1;
+                       } else {
+                               return value2;
+                       }
+               }
+
+       }
+
+       private static class PojoComparableAggregator<T> extends 
ComparableAggregator<T> {
+
+               private static final long serialVersionUID = 1L;
+               PojoComparator<T> pojoComparator;
+
+               public PojoComparableAggregator(String field, 
TypeInformation<?> typeInfo,
+                               AggregationType aggregationType, boolean first) 
{
+                       super(0, aggregationType, first);
+                       if (!(typeInfo instanceof CompositeType<?>)) {
+                               throw new IllegalArgumentException(
+                                               "Key expressions are only 
supported on POJO types and Tuples. "
+                                                               + "A type is 
considered a POJO if all its fields are public, or have both getters and 
setters defined");
+                       }
+
+                       @SuppressWarnings("unchecked")
+                       CompositeType<T> cType = (CompositeType<T>) typeInfo;
+
+                       List<FlatFieldDescriptor> fieldDescriptors = 
cType.getFlatFields(field);
+                       int logicalKeyPosition = 
fieldDescriptors.get(0).getPosition();
+
+                       if (cType instanceof PojoTypeInfo) {
+                               pojoComparator = (PojoComparator<T>) 
cType.createComparator(
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0);
+                       } else {
+                               throw new IllegalArgumentException(
+                                               "Key expressions are only 
supported on POJO types. "
+                                                               + "A type is 
considered a POJO if all its fields are public, or have both getters and 
setters defined");
+                       }
+               }
+
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       Field[] keyFields = pojoComparator.getKeyFields();
+                       Object field1 = 
pojoComparator.accessField(keyFields[0], value1);
+                       Object field2 = 
pojoComparator.accessField(keyFields[0], value2);
+
+                       @SuppressWarnings("unchecked")
+                       int c = comparator.isExtremal((Comparable<Object>) 
field1, field2);
+
+                       if (byAggregate) {
+                               if (c == 1) {
+                                       return value1;
+                               }
+                               if (first) {
+                                       if (c == 0) {
+                                               return value1;
+                                       }
+                               }
+
+                               return value2;
+                       } else {
+                               if (c == 1) {
+                                       keyFields[0].set(value2, field1);
+                               } 
+                               
+                               return value2;
+                       }
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
new file mode 100644
index 0000000..f56774b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.io.Serializable;
+
+import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+
+public abstract class Comparator implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       public abstract <R> int isExtremal(Comparable<R> o1, R o2);
+
+       public static Comparator getForAggregation(AggregationType type) {
+               switch (type) {
+               case MAX:
+                       return new MaxComparator();
+               case MIN:
+                       return new MinComparator();
+               case MINBY:
+                       return new MinByComparator();
+               case MAXBY:
+                       return new MaxByComparator();
+               default:
+                       throw new IllegalArgumentException("Unsupported 
aggregation type.");
+               }
+       }
+
+       private static class MaxComparator extends Comparator {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public <R> int isExtremal(Comparable<R> o1, R o2) {
+                       return o1.compareTo(o2) > 0 ? 1 : 0;
+               }
+
+       }
+
+       private static class MaxByComparator extends Comparator {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public <R> int isExtremal(Comparable<R> o1, R o2) {
+                       int c = o1.compareTo(o2);
+                       if (c > 0) {
+                               return 1;
+                       }
+                       if (c == 0) {
+                               return 0;
+                       } else {
+                               return -1;
+                       }
+               }
+
+       }
+
+       private static class MinByComparator extends Comparator {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public <R> int isExtremal(Comparable<R> o1, R o2) {
+                       int c = o1.compareTo(o2);
+                       if (c < 0) {
+                               return 1;
+                       }
+                       if (c == 0) {
+                               return 0;
+                       } else {
+                               return -1;
+                       }
+               }
+
+       }
+
+       private static class MinComparator extends Comparator {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public <R> int isExtremal(Comparable<R> o1, R o2) {
+                       return o1.compareTo(o2) < 0 ? 1 : 0;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
new file mode 100644
index 0000000..142028b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class SumAggregator {
+
+       public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> 
clazz,
+                       TypeInformation<T> typeInfo) {
+
+               if (typeInfo.isTupleType()) {
+                       return new TupleSumAggregator<T>(pos, 
SumFunction.getForClass(clazz));
+               } else if (typeInfo instanceof BasicArrayTypeInfo
+                               || typeInfo instanceof PrimitiveArrayTypeInfo) {
+                       return new ArraySumAggregator<T>(pos, 
SumFunction.getForClass(clazz));
+               } else {
+                       return new 
SimpleSumAggregator<T>(SumFunction.getForClass(clazz));
+               }
+
+       }
+
+       public static <T> ReduceFunction<T> getSumFunction(String field, 
TypeInformation<T> typeInfo) {
+
+               return new PojoSumAggregator<T>(field, typeInfo);
+       }
+
+       private static class TupleSumAggregator<T> extends 
AggregationFunction<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               SumFunction adder;
+
+               public TupleSumAggregator(int pos, SumFunction adder) {
+                       super(pos);
+                       this.adder = adder;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       Tuple tuple1 = (Tuple) value1;
+                       Tuple tuple2 = (Tuple) value2;
+
+                       tuple2.setField(adder.add(tuple1.getField(position), 
tuple2.getField(position)),
+                                       position);
+
+                       return (T) tuple2;
+               }
+
+       }
+
+       private static class ArraySumAggregator<T> extends 
AggregationFunction<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               SumFunction adder;
+
+               public ArraySumAggregator(int pos, SumFunction adder) {
+                       super(pos);
+                       this.adder = adder;
+               }
+
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       Object v1 = Array.get(value1, position);
+                       Object v2 = Array.get(value2, position);
+                       Array.set(value2, position, adder.add(v1, v2));
+                       return value2;
+               }
+
+       }
+
+       private static class SimpleSumAggregator<T> extends 
AggregationFunction<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               SumFunction adder;
+
+               public SimpleSumAggregator(SumFunction adder) {
+                       super(0);
+                       this.adder = adder;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       return (T) adder.add(value1, value2);
+               }
+
+       }
+
+       private static class PojoSumAggregator<T> extends 
AggregationFunction<T> {
+
+               private static final long serialVersionUID = 1L;
+               SumFunction adder;
+               PojoComparator<T> comparator;
+
+               public PojoSumAggregator(String field, TypeInformation<?> type) 
{
+                       super(0);
+                       if (!(type instanceof CompositeType<?>)) {
+                               throw new IllegalArgumentException(
+                                               "Key expressions are only 
supported on POJO types and Tuples. "
+                                                               + "A type is 
considered a POJO if all its fields are public, or have both getters and 
setters defined");
+                       }
+
+                       @SuppressWarnings("unchecked")
+                       CompositeType<T> cType = (CompositeType<T>) type;
+
+                       List<FlatFieldDescriptor> fieldDescriptors = 
cType.getFlatFields(field);
+
+                       int logicalKeyPosition = 
fieldDescriptors.get(0).getPosition();
+                       Class<?> keyClass = 
fieldDescriptors.get(0).getType().getTypeClass();
+
+                       adder = SumFunction.getForClass(keyClass);
+
+                       if (cType instanceof PojoTypeInfo) {
+                               comparator = (PojoComparator<T>) 
cType.createComparator(
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0);
+                       } else {
+                               throw new IllegalArgumentException(
+                                               "Key expressions are only 
supported on POJO types. "
+                                                               + "A type is 
considered a POJO if all its fields are public, or have both getters and 
setters defined");
+                       }
+               }
+
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+
+                       Field[] keyFields = comparator.getKeyFields();
+                       Object field1 = comparator.accessField(keyFields[0], 
value1);
+                       Object field2 = comparator.accessField(keyFields[0], 
value2);
+
+                       keyFields[0].set(value2, adder.add(field1, field2));
+
+                       return value2;
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
new file mode 100644
index 0000000..2aef19c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.io.Serializable;
+
+public abstract class SumFunction implements Serializable{
+
+       private static final long serialVersionUID = 1L;
+
+       public abstract Object add(Object o1, Object o2);
+
+       public static SumFunction getForClass(Class<?> clazz) {
+
+               if (clazz == Integer.class) {
+                       return new IntSum();
+               } else if (clazz == Long.class) {
+                       return new LongSum();
+               } else if (clazz == Short.class) {
+                       return new ShortSum();
+               } else if (clazz == Double.class) {
+                       return new DoubleSum();
+               } else if (clazz == Float.class) {
+                       return new FloatSum();
+               } else if (clazz == Byte.class) {
+                       return new ByteSum();
+               } else {
+                       throw new RuntimeException("DataStream cannot be summed 
because the class "
+                                       + clazz.getSimpleName() + " does not 
support the + operator.");
+               }
+       }
+
+       public static class IntSum extends SumFunction {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Integer) value1 + (Integer) value2;
+               }
+       }
+
+       public static class LongSum extends SumFunction {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Long) value1 + (Long) value2;
+               }
+       }
+
+       public static class DoubleSum extends SumFunction {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Double) value1 + (Double) value2;
+               }
+       }
+
+       public static class ShortSum extends SumFunction {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Short) value1 + (Short) value2;
+               }
+       }
+
+       public static class FloatSum extends SumFunction {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Float) value1 + (Float) value2;
+               }
+       }
+
+       public static class ByteSum extends SumFunction {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Object add(Object value1, Object value2) {
+                       return (Byte) value1 + (Byte) value2;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
new file mode 100644
index 0000000..10e8bac
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * A CoFlatMapFunction represents a FlatMap transformation with two different
+ * input types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, 
Serializable {
+
+       public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
+
+       public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
new file mode 100644
index 0000000..2903828
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * A CoMapFunction represents a Map transformation with two different input
+ * types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+       public OUT map1(IN1 value);
+
+       public OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
new file mode 100644
index 0000000..879a1b4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * The CoReduceFunction interface represents a Reduce transformation with two
+ * different input streams. The reduce1 function combine groups of elements of
+ * the first input with the same key to a single value, while reduce2 combine
+ * groups of elements of the second input with the same key to a single value.
+ * Each produced values are mapped to the same type by map1 and map2,
+ * respectively, to form one output stream.
+ * 
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * 
+ * <pre>
+ * <blockquote>
+ * ConnectedDataStream<X> input = ...;
+ * 
+ * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
+ *          .reduce(new MyCoReduceFunction(), keyPosition1, 
keyPosition2).addSink(...);
+ * </blockquote>
+ * </pre>
+ * <p>
+ * 
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoReduceFunction<IN1, IN2, OUT> extends Function, 
Serializable {
+
+       /**
+        * The core method of CoReduceFunction, combining two values of the 
first
+        * input into one value of the same type. The reduce1 function is
+        * consecutively applied to all values of a group until only a single 
value
+        * remains.
+        *
+        * @param value1
+        *            The first value to combine.
+        * @param value2
+        *            The second value to combine.
+        * @return The combined value of both input values.
+        *
+        * @throws Exception
+        *             This method may throw exceptions. Throwing an exception 
will
+        *             cause the operation to fail and may trigger recovery.
+        */
+       public IN1 reduce1(IN1 value1, IN1 value2);
+
+       /**
+        * The core method of ReduceFunction, combining two values of the second
+        * input into one value of the same type. The reduce2 function is
+        * consecutively applied to all values of a group until only a single 
value
+        * remains.
+        *
+        * @param value1
+        *            The first value to combine.
+        * @param value2
+        *            The second value to combine.
+        * @return The combined value of both input values.
+        *
+        * @throws Exception
+        *             This method may throw exceptions. Throwing an exception 
will
+        *             cause the operation to fail and may trigger recovery.
+        */
+       public IN2 reduce2(IN2 value1, IN2 value2);
+
+       /**
+        * Maps the reduced first input to the output type.
+        * 
+        * @param value
+        *            Type of the first input.
+        * @return the output type.
+        */
+       public OUT map1(IN1 value);
+
+       /**
+        * Maps the reduced second input to the output type.
+        * 
+        * @param value
+        *            Type of the second input.
+        * @return the output type.
+        */
+       public OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
new file mode 100644
index 0000000..2f2514f
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {
+
+       public void coWindow(List<IN1> first, List<IN2> second, Collector<O> 
out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
new file mode 100644
index 0000000..9cafcd1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.util.Collector;
+
+public class CrossWindowFunction<IN1, IN2, OUT> implements 
CoWindowFunction<IN1, IN2, OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private CrossFunction<IN1, IN2, OUT> crossFunction;
+
+       public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
+               this.crossFunction = crossFunction;
+       }
+
+       @Override
+       public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> 
out) throws Exception {
+               for (IN1 firstValue : first) {
+                       for (IN2 secondValue : second) {
+                               out.collect(crossFunction.cross(firstValue, 
secondValue));
+                       }
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
new file mode 100644
index 0000000..9b39f33
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2, OUT> implements 
CoWindowFunction<IN1, IN2, OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private KeySelector<IN1, ?> keySelector1;
+       private KeySelector<IN2, ?> keySelector2;
+       private JoinFunction<IN1, IN2, OUT> joinFunction;
+
+       public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, 
KeySelector<IN2, ?> keySelector2,
+                       JoinFunction<IN1, IN2, OUT> joinFunction) {
+               this.keySelector1 = keySelector1;
+               this.keySelector2 = keySelector2;
+               this.joinFunction = joinFunction;
+       }
+
+       @Override
+       public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> 
out) throws Exception {
+               for (IN1 item1 : first) {
+                       Object key1 = keySelector1.getKey(item1);
+
+                       for (IN2 item2 : second) {
+                               Object key2 = keySelector2.getKey(item2);
+
+                               if (key1.equals(key2)) {
+                                       out.collect(joinFunction.join(item1, 
item2));
+                               }
+                       }
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
new file mode 100644
index 0000000..2458f1b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoFlatMapFunction represents a FlatMap transformation with two 
different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements
+               CoFlatMapFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
new file mode 100755
index 0000000..20d520c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoMapFunction represents a Map transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoMapFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements
+               CoMapFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
new file mode 100644
index 0000000..655923f
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoReduceFunction represents a Reduce transformation with two 
different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements
+               CoReduceFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
new file mode 100644
index 0000000..2709203
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class RichCoWindowFunction<IN1, IN2, O> extends 
AbstractRichFunction implements
+               CoWindowFunction<IN1, IN2, O> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public abstract void coWindow(List<IN1> first, List<IN2> second, 
Collector<O> out)
+                       throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
new file mode 100644
index 0000000..24beba1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples in the specified
+ * OutputFormat format. Tuples are collected to a list and written to the file
+ * periodically. The target path and the overwrite mode are pre-packaged in
+ * format.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSinkFunction.class);
+       protected ArrayList<IN> tupleList = new ArrayList<IN>();
+       protected volatile OutputFormat<IN> format;
+       protected volatile boolean cleanupCalled = false;
+       protected int indexInSubtaskGroup;
+       protected int currentNumberOfSubtasks;
+
+       public FileSinkFunction(OutputFormat<IN> format) {
+               this.format = format;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               format.configure(context.getTaskStubParameters());
+               indexInSubtaskGroup = context.getIndexOfThisSubtask();
+               currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
+               format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
+       }
+
+       @Override
+       public void invoke(IN record) throws Exception {
+               tupleList.add(record);
+               if (updateCondition()) {
+                       flush();
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (!tupleList.isEmpty()) {
+                       flush();
+               }
+               try {
+                       format.close();
+               } catch (Exception ex) {
+                       try {
+                               if (!cleanupCalled && format instanceof 
CleanupWhenUnsuccessful) {
+                                       cleanupCalled = true;
+                                       ((CleanupWhenUnsuccessful) 
format).tryCleanupOnError();
+                               }
+                       } catch (Throwable t) {
+                               LOG.error("Cleanup on error failed.", t);
+                       }
+               }
+       }
+
+       protected void flush() {
+               try {
+                       for (IN rec : tupleList) {
+                               format.writeRecord(rec);
+                       }
+               } catch (Exception ex) {
+                       try {
+                               if (!cleanupCalled && format instanceof 
CleanupWhenUnsuccessful) {
+                                       cleanupCalled = true;
+                                       ((CleanupWhenUnsuccessful) 
format).tryCleanupOnError();
+                               }
+                       } catch (Throwable t) {
+                               LOG.error("Cleanup on error failed.", t);
+                       }
+               }
+               resetParameters();
+       }
+
+       /**
+        * Condition for writing the contents of tupleList and clearing it.
+        * 
+        * @return value of the updating condition
+        */
+       protected abstract boolean updateCondition();
+
+       /**
+        * Statements to be executed after writing a batch goes here.
+        */
+       protected abstract void resetParameters();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
new file mode 100644
index 0000000..f049a32
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * Implementation of FileSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private final long millis;
+       private long lastTime;
+
+       public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
+               super(format);
+               this.millis = millis;
+               lastTime = System.currentTimeMillis();
+       }
+
+       /**
+        * Condition for writing the contents of tupleList and clearing it.
+        * 
+        * @return value of the updating condition
+        */
+       @Override
+       protected boolean updateCondition() {
+               return System.currentTimeMillis() - lastTime >= millis;
+       }
+
+       /**
+        * Statements to be executed after writing a batch goes here.
+        */
+       @Override
+       protected void resetParameters() {
+               tupleList.clear();
+               lastTime = System.currentTimeMillis();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
new file mode 100755
index 0000000..d460749
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ * 
+ * @param <IN>
+ *            Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private static final boolean STD_OUT = false;
+       private static final boolean STD_ERR = true;
+       
+       private boolean target; 
+       private transient PrintStream stream;
+       private transient String prefix;
+       
+       /**
+        * Instantiates a print sink function that prints to standard out.
+        */
+       public PrintSinkFunction() {}
+       
+       /**
+        * Instantiates a print sink function that prints to standard out.
+        * 
+        * @param stdErr True, if the format should print to standard error 
instead of standard out.
+        */
+       public PrintSinkFunction(boolean stdErr) {
+               target = stdErr;
+       }
+
+       public void setTargetToStandardOut() {
+               target = STD_OUT;
+       }
+       
+       public void setTargetToStandardErr() {
+               target = STD_ERR;
+       }
+       
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               // get the target stream
+               stream = target == STD_OUT ? System.out : System.err;
+               
+               // set the prefix if we have a >1 DOP
+               prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
+                               ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+       }
+
+       @Override
+       public void invoke(IN record) {
+               if (prefix != null) {
+                       stream.println(prefix + record.toString());
+               }
+               else {
+                       stream.println(record.toString());
+               }
+       }
+       
+       @Override
+       public void close() throws Exception {
+               this.stream = null;
+               this.prefix = null;
+               super.close();
+       }
+       
+       @Override
+       public String toString() {
+               return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
new file mode 100755
index 0000000..3b8a4db
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSinkFunction<IN> extends AbstractRichFunction 
implements SinkFunction<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       public abstract void invoke(IN value) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
new file mode 100644
index 0000000..6097603
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+public interface SinkFunction<IN> extends Function, Serializable {
+
+       public abstract void invoke(IN value) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
new file mode 100644
index 0000000..a606302
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/**
+ * Abstract class for formatting the output of the writeAsText and writeAsCsv
+ * functions.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteFormat<IN> implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Writes the contents of tupleList to the file specified by path.
+        * 
+        * @param path
+        *            is the path to the location where the tuples are written
+        * @param tupleList
+        *            is the list of tuples to be written
+        */
+       protected abstract void write(String path, ArrayList<IN> tupleList);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
new file mode 100644
index 0000000..b22fd80
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in csv format.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       protected void write(String path, ArrayList<IN> tupleList) {
+               try {
+                       PrintWriter outStream = new PrintWriter(new 
BufferedWriter(new FileWriter(path, true)));
+                       for (IN tupleToWrite : tupleList) {
+                               String strTuple = tupleToWrite.toString();
+                               outStream.println(strTuple.substring(1, 
strTuple.length() - 1));
+                       }
+                       outStream.close();
+               } catch (IOException e) {
+                       throw new RuntimeException("Exception occured while 
writing file " + path, e);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
new file mode 100644
index 0000000..5891104
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in text format.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsText<IN> extends WriteFormat<IN> {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void write(String path, ArrayList<IN> tupleList) {
+               try {
+                       PrintWriter outStream = new PrintWriter(new 
BufferedWriter(new FileWriter(path, true)));
+                       for (IN tupleToWrite : tupleList) {
+                               outStream.println(tupleToWrite);
+                       }
+                       outStream.close();
+               } catch (IOException e) {
+                       throw new RuntimeException("Exception occured while 
writing file " + path, e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
new file mode 100644
index 0000000..0c52afc
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples as simple text to
+ * the file specified by path. Tuples are collected to a list and written to 
the
+ * file periodically. The file specified by path is created if it does not
+ * exist, cleared if it exists before the writing.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       protected final String path;
+       protected ArrayList<IN> tupleList = new ArrayList<IN>();
+       protected WriteFormat<IN> format;
+
+       public WriteSinkFunction(String path, WriteFormat<IN> format) {
+               this.path = path;
+               this.format = format;
+               cleanFile(path);
+       }
+
+       /**
+        * Creates target file if it does not exist, cleans it if it exists.
+        * 
+        * @param path
+        *            is the path to the location where the tuples are written
+        */
+       protected void cleanFile(String path) {
+               try {
+                       PrintWriter writer;
+                       writer = new PrintWriter(path);
+                       writer.print("");
+                       writer.close();
+               } catch (FileNotFoundException e) {
+                       throw new RuntimeException("File not found " + path, e);
+               }
+       }
+
+       /**
+        * Condition for writing the contents of tupleList and clearing it.
+        * 
+        * @return value of the updating condition
+        */
+       protected abstract boolean updateCondition();
+
+       /**
+        * Statements to be executed after writing a batch goes here.
+        */
+       protected abstract void resetParameters();
+
+       /**
+        * Implementation of the invoke method of the SinkFunction class. 
Collects
+        * the incoming tuples in tupleList and appends the list to the end of 
the
+        * target file if updateCondition() is true or the current tuple is the
+        * endTuple.
+        */
+       @Override
+       public void invoke(IN tuple) {
+
+               tupleList.add(tuple);
+               if (updateCondition()) {
+                       format.write(path, tupleList);
+                       resetParameters();
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
new file mode 100644
index 0000000..ee6df94
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+/**
+ * Implementation of WriteSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private final long millis;
+       private long lastTime;
+
+       public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, 
long millis) {
+               super(path, format);
+               this.millis = millis;
+               lastTime = System.currentTimeMillis();
+       }
+
+       @Override
+       protected boolean updateCondition() {
+               return System.currentTimeMillis() - lastTime >= millis;
+       }
+
+       @Override
+       protected void resetParameters() {
+               tupleList.clear();
+               lastTime = System.currentTimeMillis();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
new file mode 100644
index 0000000..05a2489
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileMonitoringFunction implements SourceFunction<Tuple3<String, 
Long, Long>> {
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileMonitoringFunction.class);
+
+       public enum WatchType {
+               ONLY_NEW_FILES, // Only new files will be processed.
+               REPROCESS_WITH_APPENDED, // When some files are appended, all 
contents of the files will be processed.
+               PROCESS_ONLY_APPENDED // When some files are appended, only 
appended contents will be processed.
+       }
+
+       private String path;
+       private long interval;
+       private WatchType watchType;
+
+       private FileSystem fileSystem;
+       private Map<String, Long> offsetOfFiles;
+       private Map<String, Long> modificationTimes;
+
+       public FileMonitoringFunction(String path, long interval, WatchType 
watchType) {
+               this.path = path;
+               this.interval = interval;
+               this.watchType = watchType;
+               this.modificationTimes = new HashMap<String, Long>();
+               this.offsetOfFiles = new HashMap<String, Long>();
+       }
+
+       @Override
+       public void invoke(Collector<Tuple3<String, Long, Long>> collector) 
throws Exception {
+               fileSystem = FileSystem.get(new URI(path));
+
+               while (true) {
+                       List<String> files = listNewFiles();
+                       for (String filePath : files) {
+                               if (watchType == WatchType.ONLY_NEW_FILES
+                                               || watchType == 
WatchType.REPROCESS_WITH_APPENDED) {
+                                       collector.collect(new Tuple3<String, 
Long, Long>(filePath, 0L, -1L));
+                                       offsetOfFiles.put(filePath, -1L);
+                               } else if (watchType == 
WatchType.PROCESS_ONLY_APPENDED) {
+                                       long offset = 0;
+                                       long fileSize = 
fileSystem.getFileStatus(new Path(filePath)).getLen();
+                                       if 
(offsetOfFiles.containsKey(filePath)) {
+                                               offset = 
offsetOfFiles.get(filePath);
+                                       }
+
+                                       collector.collect(new Tuple3<String, 
Long, Long>(filePath, offset, fileSize));
+                                       offsetOfFiles.put(filePath, fileSize);
+
+                                       LOG.info("File processed: {}, {}, {}", 
filePath, offset, fileSize);
+                               }
+                       }
+
+                       Thread.sleep(interval);
+               }
+       }
+
+       private List<String> listNewFiles() throws IOException {
+               List<String> files = new ArrayList<String>();
+
+               FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+
+               for (FileStatus status : statuses) {
+                       Path filePath = status.getPath();
+                       String fileName = filePath.getName();
+                       long modificationTime = status.getModificationTime();
+
+                       if (!isFiltered(fileName, modificationTime)) {
+                               files.add(filePath.toString());
+                               modificationTimes.put(fileName, 
modificationTime);
+                       }
+               }
+               return files;
+       }
+
+       private boolean isFiltered(String fileName, long modificationTime) {
+
+               if ((watchType == WatchType.ONLY_NEW_FILES && 
modificationTimes.containsKey(fileName))
+                               || fileName.startsWith(".") || 
fileName.contains("_COPYING_")) {
+                       return true;
+               } else {
+                       Long lastModification = modificationTimes.get(fileName);
+                       if (lastModification == null) {
+                               return false;
+                       } else {
+                               return lastModification >= modificationTime;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
new file mode 100644
index 0000000..0882d9e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, 
Long>, String> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void flatMap(Tuple3<String, Long, Long> value, Collector<String> 
out) throws Exception {
+               FSDataInputStream stream = FileSystem.get(new 
URI(value.f0)).open(new Path(value.f0));
+               stream.seek(value.f1);
+
+               BufferedReader reader = new BufferedReader(new 
InputStreamReader(stream));
+               String line;
+
+               try {
+                       while ((line = reader.readLine()) != null && (value.f2 
== -1L || stream.getPos() <= value.f2)) {
+                               out.collect(line);
+                       }
+               } finally {
+                       reader.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
new file mode 100644
index 0000000..5dfe4b2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+
+public class FileSourceFunction extends RichSourceFunction<String> {
+       private static final long serialVersionUID = 1L;
+
+       private InputSplitProvider provider;
+
+       private InputFormat<String, ?> inputFormat;
+
+       private TypeSerializerFactory<String> serializerFactory;
+
+       public FileSourceFunction(InputFormat<String, ?> format, 
TypeInformation<String> typeInfo) {
+               this.inputFormat = format;
+               this.serializerFactory = createSerializer(typeInfo);
+       }
+
+       private static TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
+               TypeSerializer<String> serializer = typeInfo.createSerializer();
+
+               if (serializer.isStateful()) {
+                       return new 
RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
+               } else {
+                       return new 
RuntimeStatelessSerializerFactory<String>(serializer,
+                                       typeInfo.getTypeClass());
+               }
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               this.provider = context.getInputSplitProvider();
+               inputFormat.configure(context.getTaskStubParameters());
+       }
+
+       @Override
+       public void invoke(Collector<String> collector) throws Exception {
+               final TypeSerializer<String> serializer = 
serializerFactory.getSerializer();
+               final Iterator<InputSplit> splitIterator = getInputSplits();
+               @SuppressWarnings("unchecked")
+               final InputFormat<String, InputSplit> format = 
(InputFormat<String, InputSplit>) this.inputFormat;
+               try {
+                       while (splitIterator.hasNext()) {
+
+                               final InputSplit split = splitIterator.next();
+                               String record = serializer.createInstance();
+
+                               format.open(split);
+                               try {
+                                       while (!format.reachedEnd()) {
+                                               if ((record = 
format.nextRecord(record)) != null) {
+                                                       
collector.collect(record);
+                                               }
+                                       }
+                               } finally {
+                                       format.close();
+                               }
+                       }
+                       collector.close();
+               } catch (Exception ex) {
+                       ex.printStackTrace();
+               }
+       }
+
+       private Iterator<InputSplit> getInputSplits() {
+
+               return new Iterator<InputSplit>() {
+
+                       private InputSplit nextSplit;
+
+                       private boolean exhausted;
+
+                       @Override
+                       public boolean hasNext() {
+                               if (exhausted) {
+                                       return false;
+                               }
+
+                               if (nextSplit != null) {
+                                       return true;
+                               }
+
+                               InputSplit split = provider.getNextInputSplit();
+
+                               if (split != null) {
+                                       this.nextSplit = split;
+                                       return true;
+                               } else {
+                                       exhausted = true;
+                                       return false;
+                               }
+                       }
+
+                       @Override
+                       public InputSplit next() {
+                               if (this.nextSplit == null && !hasNext()) {
+                                       throw new NoSuchElementException();
+                               }
+
+                               final InputSplit tmp = this.nextSplit;
+                               this.nextSplit = null;
+                               return tmp;
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+               };
+       }
+}

Reply via email to