Repository: spark
Updated Branches:
  refs/heads/master e0503a722 -> 014dc8471


[SPARK-22233][CORE] Allow user to filter out empty split in HadoopRDD

## What changes were proposed in this pull request?
Add a flag spark.files.ignoreEmptySplits. When true, methods like that use 
HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a 
partition for input splits that are empty.

Author: liulijia <liuli...@meituan.com>

Closes #19464 from liutang123/SPARK-22233.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/014dc847
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/014dc847
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/014dc847

Branch: refs/heads/master
Commit: 014dc8471200518d63005eed531777d30d8a6639
Parents: e0503a7
Author: liulijia <liuli...@meituan.com>
Authored: Sat Oct 14 17:37:33 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Sat Oct 14 17:37:33 2017 +0900

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 12 ++-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 13 ++-
 .../test/scala/org/apache/spark/FileSuite.scala | 95 ++++++++++++++++++--
 4 files changed, 112 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 19336f8..ce013d6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -270,6 +270,12 @@ package object config {
     .longConf
     .createWithDefault(4 * 1024 * 1024)
 
+  private[spark] val IGNORE_EMPTY_SPLITS = 
ConfigBuilder("spark.files.ignoreEmptySplits")
+    .doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
+      "SparkContext.textFiles will not create a partition for input splits 
that are empty.")
+    .booleanConf
+    .createWithDefault(false)
+
   private[spark] val SECRET_REDACTION_PATTERN =
     ConfigBuilder("spark.redaction.regex")
       .doc("Regex to decide which Spark configuration properties and 
environment variables in " +

http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 23b3442..1f33c0a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
+import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, 
IGNORE_EMPTY_SPLITS}
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
 import org.apache.spark.storage.StorageLevel
@@ -134,6 +134,8 @@ class HadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
@@ -195,8 +197,12 @@ class HadoopRDD[K, V](
     val jobConf = getJobConf()
     // add the credentials here as this can be called before SparkContext 
initialized
     SparkHadoopUtil.get.addCredentials(jobConf)
-    val inputFormat = getInputFormat(jobConf)
-    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, 
minPartitions)
+    val inputSplits = if (ignoreEmptySplits) {
+      allInputSplits.filter(_.getLength > 0)
+    } else {
+      allInputSplits
+    }
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))

http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 482875e..db4eac1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
+import scala.collection.JavaConverters.asScalaBufferConverter
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -34,7 +35,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
+import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, 
IGNORE_EMPTY_SPLITS}
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -89,6 +90,8 @@ class NewHadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+
   def getConf: Configuration = {
     val conf: Configuration = confBroadcast.value.value
     if (shouldCloneJobConf) {
@@ -121,8 +124,12 @@ class NewHadoopRDD[K, V](
         configurable.setConf(_conf)
       case _ =>
     }
-    val jobContext = new JobContextImpl(_conf, jobId)
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, 
jobId)).asScala
+    val rawSplits = if (ignoreEmptySplits) {
+      allRowSplits.filter(_.getLength > 0)
+    } else {
+      allRowSplits
+    }
     val result = new Array[Partition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
       result(i) = new NewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])

http://git-wip-us.apache.org/repos/asf/spark/blob/014dc847/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 0272818..4da4323 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, 
TextInputFormat => NewTextInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
 
-import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
+import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, 
IGNORE_EMPTY_SPLITS}
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -347,10 +347,10 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-  test ("allow user to disable the output directory existence checking (old 
Hadoop API") {
-    val sf = new SparkConf()
-    
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
-    sc = new SparkContext(sf)
+  test ("allow user to disable the output directory existence checking (old 
Hadoop API)") {
+    val conf = new SparkConf()
+    
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
+    sc = new SparkContext(conf)
     val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, 
"c")), 1)
     randomRDD.saveAsTextFile(tempDir.getPath + "/output")
     assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
@@ -380,9 +380,9 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   test ("allow user to disable the output directory existence checking (new 
Hadoop API") {
-    val sf = new SparkConf()
-    
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
-    sc = new SparkContext(sf)
+    val conf = new SparkConf()
+    
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
+    sc = new SparkContext(conf)
     val randomRDD = sc.parallelize(
       Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
     randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+    val conf = new SparkConf()
+    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+    sc = new SparkContext(conf)
+
+    def testIgnoreEmptySplits(
+        data: Array[Tuple2[String, String]],
+        actualPartitionNum: Int,
+        expectedPartitionNum: Int): Unit = {
+      val output = new File(tempDir, "output")
+      sc.parallelize(data, actualPartitionNum)
+        .saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+      for (i <- 0 until actualPartitionNum) {
+        assert(new File(output, s"part-0000$i").exists() === true)
+      }
+      val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+      assert(hadoopRDD.partitions.length === expectedPartitionNum)
+      Utils.deleteRecursively(output)
+    }
+
+    // Ensure that if all of the splits are empty, we remove the splits 
correctly
+    testIgnoreEmptySplits(
+      data = Array.empty[Tuple2[String, String]],
+      actualPartitionNum = 1,
+      expectedPartitionNum = 0)
+
+    // Ensure that if no split is empty, we don't lose any splits
+    testIgnoreEmptySplits(
+      data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
+      actualPartitionNum = 2,
+      expectedPartitionNum = 2)
+
+    // Ensure that if part of the splits are empty, we remove the splits 
correctly
+    testIgnoreEmptySplits(
+      data = Array(("key1", "a"), ("key2", "a")),
+      actualPartitionNum = 5,
+      expectedPartitionNum = 2)
+  }
+
+  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+    val conf = new SparkConf()
+    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+    sc = new SparkContext(conf)
+
+    def testIgnoreEmptySplits(
+        data: Array[Tuple2[String, String]],
+        actualPartitionNum: Int,
+        expectedPartitionNum: Int): Unit = {
+      val output = new File(tempDir, "output")
+      sc.parallelize(data, actualPartitionNum)
+        .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](output.getPath)
+      for (i <- 0 until actualPartitionNum) {
+        assert(new File(output, s"part-r-0000$i").exists() === true)
+      }
+      val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath,
+        classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
+        .asInstanceOf[NewHadoopRDD[_, _]]
+      assert(hadoopRDD.partitions.length === expectedPartitionNum)
+      Utils.deleteRecursively(output)
+    }
+
+    // Ensure that if all of the splits are empty, we remove the splits 
correctly
+    testIgnoreEmptySplits(
+      data = Array.empty[Tuple2[String, String]],
+      actualPartitionNum = 1,
+      expectedPartitionNum = 0)
+
+    // Ensure that if no split is empty, we don't lose any splits
+    testIgnoreEmptySplits(
+      data = Array(("1", "a"), ("2", "a"), ("3", "b")),
+      actualPartitionNum = 2,
+      expectedPartitionNum = 2)
+
+    // Ensure that if part of the splits are empty, we remove the splits 
correctly
+    testIgnoreEmptySplits(
+      data = Array(("1", "a"), ("2", "b")),
+      actualPartitionNum = 5,
+      expectedPartitionNum = 2)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to