Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19464#discussion_r144687101
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with
LocalSparkContext {
}
}
+ test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+ val conf = new SparkConf()
+ conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS,
true)
+ sc = new SparkContext(conf)
+
+ def testIgnoreEmptySplits(
+ data: Array[Tuple2[String, String]],
+ actualPartitionNum: Int,
+ expectedPartitionNum: Int): Unit = {
+ val output = new File(tempDir, "output")
+ sc.parallelize(data, actualPartitionNum)
+ .saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+ for (i <- 0 until actualPartitionNum) {
+ assert(new File(output, s"part-0000$i").exists() === true)
+ }
+ val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+ assert(hadoopRDD.partitions.length === expectedPartitionNum)
+ Utils.deleteRecursively(output)
+ }
+
+ // Ensure that if all of the splits are empty, we remove the splits
correctly
+ testIgnoreEmptySplits(
+ data = Array.empty[Tuple2[String, String]],
+ actualPartitionNum = 1,
+ expectedPartitionNum = 0)
+
+ // Ensure that if no split is empty, we don't lose any splits
+ testIgnoreEmptySplits(
+ data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
+ actualPartitionNum = 2,
+ expectedPartitionNum = 2)
+
+ // Ensure that if part of the splits are empty, we remove the splits
correctly
+ testIgnoreEmptySplits(
+ data = Array(("key1", "a"), ("key2", "a")),
+ actualPartitionNum = 5,
+ expectedPartitionNum = 2)
+ }
+
+ test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+ val conf = new SparkConf()
+ conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS,
true)
+ sc = new SparkContext(conf)
+
+ def testIgnoreEmptySplits(
+ data: Array[Tuple2[String, String]],
+ actualPartitionNum: Int,
+ expectedPartitionNum: Int): Unit = {
+ val output = new File(tempDir, "output")
+ sc.parallelize(data, actualPartitionNum)
+ .saveAsNewAPIHadoopFile[NewTextOutputFormat[String,
String]](output.getPath)
+ for (i <- 0 until actualPartitionNum) {
+ assert(new File(output, s"part-r-0000$i").exists() === true)
+ }
+ val hadoopRDD = sc.newAPIHadoopFile(new File(output,
"part-r-*").getPath,
+ classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
+ .asInstanceOf[NewHadoopRDD[_, _]]
--- End diff --
nit:
```scala
val hadoopRDD = sc.newAPIHadoopFile(
new File(output, "part-r-*").getPath,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).asInstanceOf[NewHadoopRDD[_, _]]
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]