Repository: flink Updated Branches: refs/heads/master 6c0a83e4f -> 233c0147b
[FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233c0147 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233c0147 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233c0147 Branch: refs/heads/master Commit: 233c0147b1e9c18034d37f803e8974365a5a7d86 Parents: 6c0a83e Author: chengxiang li <chengxiang...@intel.com> Authored: Tue Feb 2 14:09:52 2016 +0800 Committer: chengxiang li <chengxiang...@intel.com> Committed: Tue Feb 2 14:09:52 2016 +0800 ---------------------------------------------------------------------- .../operators/udf/RangeBoundaryBuilder.java | 16 +++++----- .../api/scala/operators/PartitionITCase.scala | 32 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java index cd163d3..09bd42d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java @@ -60,13 +60,15 @@ public class RangeBoundaryBuilder<T> extends RichMapPartitionFunction<T, Object[ int boundarySize = parallelism - 1; Object[][] boundaries = new Object[boundarySize][]; - double avgRange = sampledData.size() / (double) parallelism; - int numKey = comparator.getFlatComparators().length; - for (int i = 1; i < parallelism; i++) { - T record = sampledData.get((int) (i * avgRange)); - Object[] keys = new Object[numKey]; - comparator.extractKeys(record, keys, 0); - boundaries[i-1] = keys; + if (sampledData.size() > 0) { + double avgRange = sampledData.size() / (double) parallelism; + int numKey = comparator.getFlatComparators().length; + for (int i = 1; i < parallelism; i++) { + T record = sampledData.get((int) (i * avgRange)); + Object[] keys = new Object[numKey]; + comparator.extractKeys(record, keys, 0); + boundaries[i-1] = keys; + } } out.collect(boundaries); http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala index ca8bcd9..df13cd4 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala @@ -50,6 +50,38 @@ class PartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( } @Test + def testEmptyHashPartition(): Unit = { + /* + * Test hash partition by tuple field + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromCollection(Seq[Tuple1[String]]()) + + val unique = ds.partitionByHash(0) + + unique.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + + expected = "" + } + + @Test + def testEmptyRangePartition(): Unit = { + /* + * Test hash partition by tuple field + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromCollection(Seq[Tuple1[String]]()) + + val unique = ds.partitionByRange(0) + + unique.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + + expected = "" + } + + @Test def testHashPartitionByTupleField(): Unit = { /* * Test hash partition by tuple field