Repository: flink
Updated Branches:
  refs/heads/master 153a67881 -> 544abb937


http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 1193da5..6791741 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -18,20 +18,16 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -89,9 +85,9 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                // offset semantic information by extracted key fields
                if(props != null &&
                                this.grouper != null &&
-                               this.grouper.keys instanceof 
Keys.SelectorFunctionKeys) {
+                               this.grouper.keys instanceof 
SelectorFunctionKeys) {
 
-                       int offset = ((Keys.SelectorFunctionKeys<?,?>) 
this.grouper.keys).getKeyType().getTotalFields();
+                       int offset = ((SelectorFunctionKeys<?,?>) 
this.grouper.keys).getKeyType().getTotalFields();
                        if(this.grouper instanceof SortedGrouping) {
                                offset += ((SortedGrouping<?>) 
this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
                        }
@@ -109,9 +105,9 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                // distinguish between grouped reduce and non-grouped reduce
                if (grouper == null) {
                        // non grouped reduce
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getInputType());
                        ReduceOperatorBase<IN, ReduceFunction<IN>> po =
-                                       new ReduceOperatorBase<IN, 
ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
+                                       new ReduceOperatorBase<>(function, 
operatorInfo, new int[0], name);
                        
                        po.setInput(input);
                        // the parallelism for a non grouped reduce can only be 
1
@@ -120,13 +116,14 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                        return po;
                }
                
-               if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+               if (grouper.getKeys() instanceof SelectorFunctionKeys) {
                        
                        // reduce with key selector function
                        @SuppressWarnings("unchecked")
-                       Keys.SelectorFunctionKeys<IN, ?> selectorKeys = 
(Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
-                       
-                       MapOperatorBase<?, IN, ?> po = 
translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, 
input, getParallelism());
+                       SelectorFunctionKeys<IN, ?> selectorKeys = 
(SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+
+                       
org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> po =
+                               translateSelectorFunctionReducer(selectorKeys, 
function, getInputType(), name, input, getParallelism());
                        ((PlanUnwrappingReduceOperator<?, ?>) 
po.getInput()).setCustomPartitioner(grouper.getCustomPartitioner());
                        
                        return po;
@@ -135,9 +132,9 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                        
                        // reduce with field positions
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getInputType());
                        ReduceOperatorBase<IN, ReduceFunction<IN>> po =
-                                       new ReduceOperatorBase<IN, 
ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name);
+                                       new ReduceOperatorBase<>(function, 
operatorInfo, logicalKeyPositions, name);
                        
                        po.setCustomPartitioner(grouper.getCustomPartitioner());
                        
@@ -153,30 +150,24 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
        
        // 
--------------------------------------------------------------------------------------------
        
-       private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> 
translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
-                       ReduceFunction<T> function, TypeInformation<T> 
inputType, String name, Operator<T> input, int parallelism)
+       private static <T, K> 
org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> 
translateSelectorFunctionReducer(
+               SelectorFunctionKeys<T, ?> rawKeys,
+               ReduceFunction<T> function,
+               TypeInformation<T> inputType,
+               String name,
+               Operator<T> input,
+               int parallelism)
        {
                @SuppressWarnings("unchecked")
-               final Keys.SelectorFunctionKeys<T, K> keys = 
(Keys.SelectorFunctionKeys<T, K>) rawKeys;
-               
-               TypeInformation<Tuple2<K, T>> typeInfoWithKey = new 
TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
-               
-               KeyExtractingMapper<T, K> extractor = new 
KeyExtractingMapper<T, K>(keys.getKeyExtractor());
+               final SelectorFunctionKeys<T, K> keys = 
(SelectorFunctionKeys<T, K>) rawKeys;
                
-               PlanUnwrappingReduceOperator<T, K> reducer = new 
PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, 
typeInfoWithKey);
+               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
+               Operator<Tuple2<K, T>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
                
-               MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> 
keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, 
Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, 
T>>(inputType, typeInfoWithKey), "Key Extractor");
-               MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> 
keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, 
T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, 
T>(typeInfoWithKey, inputType), "Key Extractor");
-
-               keyExtractingMap.setInput(input);
-               reducer.setInput(keyExtractingMap);
-               keyRemovingMap.setInput(reducer);
-               
-               // set parallelism
-               keyExtractingMap.setParallelism(input.getParallelism());
+               PlanUnwrappingReduceOperator<T, K> reducer = new 
PlanUnwrappingReduceOperator<>(function, keys, name, inputType, 
typeInfoWithKey);
+               reducer.setInput(keyedInput);
                reducer.setParallelism(parallelism);
-               keyRemovingMap.setParallelism(parallelism);
-               
-               return keyRemovingMap;
+
+               return SelectorFunctionKeys.appendKeyRemover(reducer, keys);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index b488dd1..6092d14 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -124,6 +125,16 @@ public class SortedGrouping<T> extends Grouping<T> {
        protected Order[] getGroupSortOrders() {
                return this.groupSortOrders;
        }
+
+       protected Ordering getGroupOrdering() {
+
+               Ordering o = new Ordering();
+               for(int i=0; i < this.groupSortKeyPositions.length; i++) {
+                       o.appendOrdering(this.groupSortKeyPositions[i], null, 
this.groupSortOrders[i]);
+               }
+
+               return o;
+       }
        
        /**
         * Uses a custom partitioner for the grouping.

Reply via email to