Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19464#discussion_r144357376
  
    --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
    @@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
         }
       }
     
    +  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
    +    val conf = new SparkConf()
    +    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
    +    sc = new SparkContext(conf)
    +
    +    def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
    +      outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
    +      val dataRDD = sc.parallelize(data, numSlices)
    +      val output = new File(tempDir, "output" + outputSuffix)
    +      dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
    +      assert(new File(output, checkPart).exists() === true)
    +      val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
    +      assert(hadoopRDD.partitions.length === expectedPartitionNum)
    +    }
    +
    +    // Ensure that if all of the splits are empty, we remove the splits 
correctly
    +    testIgnoreEmptySplits(
    +      data = Array.empty[Tuple2[String, String]],
    +      numSlices = 1,
    +      outputSuffix = 0,
    +      checkPart = "part-00000",
    +      expectedPartitionNum = 0)
    +
    +    // Ensure that if no split is empty, we don't lose any splits
    +    testIgnoreEmptySplits(
    +      data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
    +      numSlices = 2,
    +      outputSuffix = 1,
    +      checkPart = "part-00001",
    +      expectedPartitionNum = 2)
    +
    +    // Ensure that if part of the splits are empty, we remove the splits 
correctly
    +    testIgnoreEmptySplits(
    +      data = Array(("key1", "a"), ("key2", "a")),
    +      numSlices = 5,
    +      outputSuffix = 2,
    +      checkPart = "part-00004",
    +      expectedPartitionNum = 2)
    +  }
    +
    +  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
    +    val conf = new SparkConf()
    +    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
    +    sc = new SparkContext(conf)
    +
    +    def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
    +      outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
    +      val dataRDD = sc.parallelize(data, numSlices)
    +      val output = new File(tempDir, "output" + outputSuffix)
    +      dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](output.getPath)
    +      assert(new File(output, checkPart).exists() === true)
    +      val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath)
    --- End diff --
    
    I think we should _read_ it with new hadoop API to test `NewHadoopRDD` I 
guess?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to