Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19464#discussion_r144357376 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } + test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") { + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) + sc = new SparkContext(conf) + + def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, + outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { + val dataRDD = sc.parallelize(data, numSlices) + val output = new File(tempDir, "output" + outputSuffix) + dataRDD.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) + assert(new File(output, checkPart).exists() === true) + val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) + assert(hadoopRDD.partitions.length === expectedPartitionNum) + } + + // Ensure that if all of the splits are empty, we remove the splits correctly + testIgnoreEmptySplits( + data = Array.empty[Tuple2[String, String]], + numSlices = 1, + outputSuffix = 0, + checkPart = "part-00000", + expectedPartitionNum = 0) + + // Ensure that if no split is empty, we don't lose any splits + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + numSlices = 2, + outputSuffix = 1, + checkPart = "part-00001", + expectedPartitionNum = 2) + + // Ensure that if part of the splits are empty, we remove the splits correctly + testIgnoreEmptySplits( + data = Array(("key1", "a"), ("key2", "a")), + numSlices = 5, + outputSuffix = 2, + checkPart = "part-00004", + expectedPartitionNum = 2) + } + + test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") { + val conf = new SparkConf() + conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) + sc = new SparkContext(conf) + + def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int, + outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): Unit = { + val dataRDD = sc.parallelize(data, numSlices) + val output = new File(tempDir, "output" + outputSuffix) + dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) + assert(new File(output, checkPart).exists() === true) + val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath) --- End diff -- I think we should _read_ it with new hadoop API to test `NewHadoopRDD` I guess?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org