Repository: flink
Updated Branches:
  refs/heads/master 6c0a83e4f -> 233c0147b


[FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233c0147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233c0147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233c0147

Branch: refs/heads/master
Commit: 233c0147b1e9c18034d37f803e8974365a5a7d86
Parents: 6c0a83e
Author: chengxiang li <chengxiang...@intel.com>
Authored: Tue Feb 2 14:09:52 2016 +0800
Committer: chengxiang li <chengxiang...@intel.com>
Committed: Tue Feb 2 14:09:52 2016 +0800

----------------------------------------------------------------------
 .../operators/udf/RangeBoundaryBuilder.java     | 16 +++++-----
 .../api/scala/operators/PartitionITCase.scala   | 32 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
index cd163d3..09bd42d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
@@ -60,13 +60,15 @@ public class RangeBoundaryBuilder<T> extends 
RichMapPartitionFunction<T, Object[
 
                int boundarySize = parallelism - 1;
                Object[][] boundaries = new Object[boundarySize][];
-               double avgRange = sampledData.size() / (double) parallelism;
-               int numKey = comparator.getFlatComparators().length;
-               for (int i = 1; i < parallelism; i++) {
-                       T record = sampledData.get((int) (i * avgRange));
-                       Object[] keys = new Object[numKey];
-                       comparator.extractKeys(record, keys, 0);
-                       boundaries[i-1] = keys;
+               if (sampledData.size() > 0) {
+                       double avgRange = sampledData.size() / (double) 
parallelism;
+                       int numKey = comparator.getFlatComparators().length;
+                       for (int i = 1; i < parallelism; i++) {
+                               T record = sampledData.get((int) (i * 
avgRange));
+                               Object[] keys = new Object[numKey];
+                               comparator.extractKeys(record, keys, 0);
+                               boundaries[i-1] = keys;
+                       }
                }
 
                out.collect(boundaries);

http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index ca8bcd9..df13cd4 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -50,6 +50,38 @@ class PartitionITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(
   }
 
   @Test
+  def testEmptyHashPartition(): Unit = {
+    /*
+     * Test hash partition by tuple field
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromCollection(Seq[Tuple1[String]]())
+
+    val unique = ds.partitionByHash(0)
+
+    unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = ""
+  }
+
+  @Test
+  def testEmptyRangePartition(): Unit = {
+    /*
+     * Test hash partition by tuple field
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromCollection(Seq[Tuple1[String]]())
+
+    val unique = ds.partitionByRange(0)
+
+    unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = ""
+  }
+
+  @Test
   def testHashPartitionByTupleField(): Unit = {
     /*
      * Test hash partition by tuple field

Reply via email to