Repository: flink
Updated Branches:
  refs/heads/master ba7a19c10 -> 2b16c6042


[FLINK-1112] Additional checks for KeySelector group sorting and minor fixes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b16c604
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b16c604
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b16c604

Branch: refs/heads/master
Commit: 2b16c6042227145241046d497f9be0e43242c9fb
Parents: f83db14
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Dec 16 15:38:08 2014 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Jan 15 18:13:18 2015 +0100

----------------------------------------------------------------------
 .../GroupingKeySelectorTranslationTest.java     |  35 +---
 .../api/java/operators/SortedGrouping.java      |   2 +-
 .../api/java/operators/UnsortedGrouping.java    |  17 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |  19 ++-
 .../javaApiOperators/GroupReduceITCase.java     |   2 -
 ...tomPartitioningGroupingKeySelectorTest.scala | 162 +++++++++++--------
 6 files changed, 134 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
index 8f446a7..a7b1167 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends 
CompilerTestBase {
                        
                        data.groupBy(new 
TestKeySelector<Tuple3<Integer,Integer,Integer>>())
                                .withPartitioner(new TestPartitionerInt())
-                               .sortGroup(1, Order.ASCENDING)
+                               .sortGroup(new TestKeySelector<Tuple3<Integer, 
Integer, Integer>>(), Order.ASCENDING)
                                .reduceGroup(new 
IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
                                .print();
                        
@@ -137,39 +137,6 @@ public class GroupingKeySelectorTranslationTest extends 
CompilerTestBase {
        }
        
        @Test
-       public void testCustomPartitioningKeySelectorGroupReduceSorted2() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple4<Integer,Integer,Integer, Integer>> data 
= env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data
-                               .groupBy(new 
TestKeySelector<Tuple4<Integer,Integer,Integer,Integer>>())
-                               .withPartitioner(new TestPartitionerInt())
-                               .sortGroup(1, Order.ASCENDING)
-                               .sortGroup(2, Order.DESCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
        public void testCustomPartitioningKeySelectorInvalidType() {
                try {
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 52a77a2..38c6c68 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -92,7 +92,7 @@ public class SortedGrouping<T> extends Grouping<T> {
                super(set, keys);
 
                if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
-                       throw new InvalidProgramException("Sorting on 
KeySelector only works for KeySelector grouping.");
+                       throw new InvalidProgramException("Sorting on 
KeySelector keys only works with KeySelector grouping.");
                }
 
                this.groupSortKeyPositions = 
keySelector.computeLogicalKeyPositions();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 3f4839f..732c59b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -231,6 +231,10 @@ public class UnsortedGrouping<T> extends Grouping<T> {
         * @see Order
         */
        public SortedGrouping<T> sortGroup(int field, Order order) {
+               if (this.getKeys() instanceof Keys.SelectorFunctionKeys) {
+                       throw new InvalidProgramException("KeySelector grouping 
keys and field index group-sorting keys cannot be used together.");
+               }
+
                SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, field, order);
                sg.customPartitioner = getCustomPartitioner();
                return sg;
@@ -248,6 +252,10 @@ public class UnsortedGrouping<T> extends Grouping<T> {
         * @see Order
         */
        public SortedGrouping<T> sortGroup(String field, Order order) {
+               if (this.getKeys() instanceof Keys.SelectorFunctionKeys) {
+                       throw new InvalidProgramException("KeySelector grouping 
keys and field expression group-sorting keys cannot be used together.");
+               }
+
                SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, field, order);
                sg.customPartitioner = getCustomPartitioner();
                return sg;
@@ -256,7 +264,6 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        /**
         * Sorts elements within a group on a key extracted by the specified 
{@link org.apache.flink.api.java.functions.KeySelector}
         * in the specified {@link Order}.</br>
-        * <b>Note: Only groups of Tuple elements and Pojos can be 
sorted.</b><br/>
         * Chaining {@link #sortGroup(KeySelector, Order)} calls is not 
supported.
         *
         * @param keySelector The KeySelector with which the group is sorted.
@@ -266,8 +273,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
         * @see Order
         */
        public <K> SortedGrouping<T> sortGroup(KeySelector<T, K> keySelector, 
Order order) {
+               if (!(this.getKeys() instanceof Keys.SelectorFunctionKeys)) {
+                       throw new InvalidProgramException("KeySelector 
group-sorting keys can only be used with KeySelector grouping keys.");
+               }
+
                TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, this.dataSet.getType());
-               return new SortedGrouping<T>(this.dataSet, this.keys, new 
Keys.SelectorFunctionKeys<T, K>(keySelector, this.dataSet.getType(), keyType), 
order);
+               SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, 
this.dataSet.getType(), keyType), order);
+               sg.customPartitioner = getCustomPartitioner();
+               return sg;
        }
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 9fc055d..7ac8dcd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -67,6 +67,10 @@ class GroupedDataSet[T: ClassTag](
     if (field >= set.getType.getArity) {
       throw new IllegalArgumentException("Order key out of tuple bounds.")
     }
+    if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) {
+      throw new InvalidProgramException("KeySelector grouping keys and field 
index group-sorting " +
+        "keys cannot be used together.")
+    }
     if (groupSortKeySelector.nonEmpty) {
       throw new InvalidProgramException("Chaining sortGroup with KeySelector 
sorting is not " +
         "supported.")
@@ -87,6 +91,10 @@ class GroupedDataSet[T: ClassTag](
       throw new InvalidProgramException("Chaining sortGroup with KeySelector 
sorting is not" +
         "supported.")
     }
+    if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) {
+      throw new InvalidProgramException("KeySelector grouping keys and field 
expression " +
+        "group-sorting keys cannot be used together.")
+    }
     groupSortKeyPositions += Right(field)
     groupSortOrders += order
     this
@@ -103,6 +111,10 @@ class GroupedDataSet[T: ClassTag](
       throw new InvalidProgramException("Chaining sortGroup with KeySelector 
sorting is not" +
         "supported.")
     }
+    if (!keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) {
+      throw new InvalidProgramException("Sorting on KeySelector keys only 
works with KeySelector " +
+        "grouping.")
+    }
 
     groupSortOrders += order
     val keyType = implicitly[TypeInformation[K]]
@@ -121,7 +133,12 @@ class GroupedDataSet[T: ClassTag](
   private def maybeCreateSortedGrouping(): Grouping[T] = {
     groupSortKeySelector match {
       case Some(keySelector) =>
-        new SortedGrouping[T](set.javaSet, keys, keySelector, 
groupSortOrders(0))
+        if (partitioner == null) {
+          new SortedGrouping[T](set.javaSet, keys, keySelector, 
groupSortOrders(0))
+        } else {
+          new SortedGrouping[T](set.javaSet, keys, keySelector, 
groupSortOrders(0))
+            .withPartitioner(partitioner)
+        }
       case None =>
         if (groupSortKeyPositions.length > 0) {
           val grouping = groupSortKeyPositions(0) match {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 29a2c46..6bef0bc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -835,7 +835,6 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                .reduceGroup(new 
Tuple3SortedGroupReduceWithCombine());
 
                reduceDs.writeAsCsv(resultPath);
-               reduceDs.print();
                env.execute();
 
                // return expected result
@@ -1302,7 +1301,6 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
                        int sum = 0;
                        long key = 0;
-                       System.out.println("im in");
                        StringBuilder concat = new StringBuilder();
 
                        for (Tuple3<Integer, Long, String> next : values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 93e3593..6b23649 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -25,32 +25,31 @@ import org.apache.flink.api.common.functions.Partitioner
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType
 import org.apache.flink.compiler.plan.SingleInputPlanNode
 import org.apache.flink.test.compiler.util.CompilerTestBase
-import scala.collection.immutable.Seq
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.InvalidProgramException
 
 class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
-  
+
   @Test
   def testCustomPartitioningKeySelectorReduce() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0,0) ).rebalance().setParallelism(4)
-      
+
       data
-          .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
-          .reduce( (a,b) => a )
-          .print()
-          
+        .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
+        .reduce( (a,b) => a )
+        .print()
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
-      
+
       val sink = op.getDataSinks.iterator().next()
       val keyRemovingMapper = 
sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       val reducer = 
keyRemovingMapper.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       val combiner = 
reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
-      
+
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.FORWARD, 
keyRemovingMapper.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput.getShipStrategy)
@@ -63,26 +62,27 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   @Test
   def testCustomPartitioningKeySelectorGroupReduce() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0,0) ).rebalance().setParallelism(4)
-      
+
       data
-          .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
-          .reduceGroup( iter => Seq(iter.next()) )
-          .print()
-          
+        .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
+        .reduce( (a, b) => a)
+        .print()
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
-      
+
       val sink = op.getDataSinks.iterator().next()
       val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+                        .getInput.getSource.asInstanceOf[SingleInputPlanNode]
       val combiner = 
reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
-      
+
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy)
@@ -94,28 +94,63 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
+  @Test
+  def testCustomPartitioningIndexGroupReduceSorted() {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val data = env.fromElements( (0,0,0) ).rebalance().setParallelism(4)
+
+      data
+        .groupBy(0)
+        .withPartitioner(new TestPartitionerInt())
+        .sortGroup(1, Order.ASCENDING)
+        .reduce( (a,b) => a)
+        .print()
+
+      val p = env.createProgramPlan()
+      val op = compileNoStats(p)
+
+      val sink = op.getDataSinks.iterator().next()
+      val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+      val combiner = 
reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+
+      assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput.getShipStrategy)
+      assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy)
+
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+
   @Test
   def testCustomPartitioningKeySelectorGroupReduceSorted() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0,0,0) ).rebalance().setParallelism(4)
-      
+
       data
-          .groupBy( _._1 )
-          .withPartitioner(new TestPartitionerInt())
-          .sortGroup(1, Order.ASCENDING)
-          .reduceGroup( iter => Seq(iter.next()) )
-          .print()
-      
+        .groupBy(_._1)
+        .withPartitioner(new TestPartitionerInt())
+        .sortGroup(_._2, Order.ASCENDING)
+        .reduce( (a,b) => a)
+        .print()
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
-      
+
       val sink = op.getDataSinks.iterator().next()
       val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+                        .getInput.getSource.asInstanceOf[SingleInputPlanNode]
       val combiner = 
reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
-      
+
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy)
@@ -127,31 +162,32 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   @Test
   def testCustomPartitioningKeySelectorGroupReduceSorted2() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0,0,0,0) ).rebalance().setParallelism(4)
-      
+
       data
-          .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
-          .sortGroup(1, Order.ASCENDING)
-          .sortGroup(2, Order.DESCENDING)
-          .reduceGroup( iter => Seq(iter.next()) )
-          .print()
-      
+        .groupBy(0).withPartitioner(new TestPartitionerInt())
+        .sortGroup(1, Order.ASCENDING)
+        .sortGroup(2, Order.DESCENDING)
+        .reduce( (a,b) => a)
+        .print()
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
-      
+
       val sink = op.getDataSinks.iterator().next()
       val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       val combiner = 
reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
-      
+
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput.getShipStrategy)
       assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy)
+
     }
     catch {
       case e: Exception => {
@@ -160,22 +196,22 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   @Test
   def testCustomPartitioningKeySelectorInvalidType() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0, 0) ).rebalance().setParallelism(4)
-      
+
       try {
         data
-            .groupBy( _._1 )
-            .withPartitioner(new TestPartitionerLong())
+          .groupBy( _._1 )
+          .withPartitioner(new TestPartitionerLong())
         fail("Should throw an exception")
       }
       catch {
-        case e: InvalidProgramException => 
+        case e: InvalidProgramException =>
       }
     }
     catch {
@@ -185,23 +221,23 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   @Test
   def testCustomPartitioningKeySelectorInvalidTypeSorted() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-    
+
       val data = env.fromElements( (0, 0, 0) ).rebalance().setParallelism(4)
-      
+
       try {
         data
-            .groupBy( _._1 )
-            .sortGroup(1, Order.ASCENDING)
-            .withPartitioner(new TestPartitionerLong())
+          .groupBy( _._1 )
+          .sortGroup(1, Order.ASCENDING)
+          .withPartitioner(new TestPartitionerLong())
         fail("Should throw an exception")
       }
       catch {
-        case e: InvalidProgramException => 
+        case e: InvalidProgramException =>
       }
     }
     catch {
@@ -211,20 +247,20 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   @Test
   def testCustomPartitioningTupleRejectCompositeKey() {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      
+
       val data = env.fromElements( (0, 0, 0) ).rebalance().setParallelism(4)
-      
+
       try {
         data.groupBy( v => (v._1, v._2) ).withPartitioner(new 
TestPartitionerInt())
         fail("Should throw an exception")
       }
       catch {
-        case e: InvalidProgramException => 
+        case e: InvalidProgramException =>
       }
     }
     catch {
@@ -234,16 +270,16 @@ class CustomPartitioningGroupingKeySelectorTest extends 
CompilerTestBase {
       }
     }
   }
-  
+
   // 
----------------------------------------------------------------------------------------------
-  
+
   private class TestPartitionerInt extends Partitioner[Int] {
-  
+
     override def partition(key: Int, numPartitions: Int): Int = 0
   }
-    
+
   private class TestPartitionerLong extends Partitioner[Long] {
-  
+
     override def partition(key: Long, numPartitions: Int): Int = 0
   }
 }

Reply via email to