Github user heary-cao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17868#discussion_r114964280
  
    --- Diff: core/src/test/scala/org/apache/spark/ShuffleSuite.scala ---
    @@ -271,25 +271,49 @@ abstract class ShuffleSuite extends SparkFunSuite 
with Matchers with LocalSparkC
       test("[SPARK-4085] rerun map stage if reduce stage cannot find its local 
shuffle file") {
         val myConf = conf.clone().set("spark.test.noStageRetry", "false")
         sc = new SparkContext("local", "test", myConf)
    -    val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
    +    val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
         rdd.count()
     
         // Delete one of the local shuffle blocks.
         val hashFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleBlockId(0, 0, 0))
         val sortFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleDataBlockId(0, 0, 0))
    -    assert(hashFile.exists() || sortFile.exists())
    +    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleIndexBlockId(0, 0, 0))
    +    assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))
     
         if (hashFile.exists()) {
           hashFile.delete()
         }
         if (sortFile.exists()) {
           sortFile.delete()
         }
    +    if (indexFile.exists()) {
    +      indexFile.delete()
    +    }
     
         // This count should retry the execution of the previous stage and 
rerun shuffle.
         rdd.count()
       }
     
    +  test("cannot find its local shuffle file if no execution of the stage 
and rerun shuffle") {
    +    val myConf = conf.clone().set("spark.test.noStageRetry", "true")
    +    sc = new SparkContext("local", "test", myConf)
    +    val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
    +
    +    // Cannot find one of the local shuffle blocks.
    +    val hashFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleBlockId(0, 0, 0))
    +    val sortFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleDataBlockId(0, 0, 0))
    +    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new 
ShuffleIndexBlockId(0, 0, 0))
    +    assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())
    +
    +     rdd.count()
    --- End diff --
    
    sorry,
    I have modify it.
    thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to