Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5964#discussion_r29823898
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -773,6 +774,64 @@ class DAGSchedulerSuite
         assertDataStructuresEmpty()
       }
     
    +  ignore("no concurrent retries for stage attempts (SPARK-7308)") {
    +    // see SPARK-7308 for a detailed description of the conditions this is 
trying to recreate.
    +    // note that this is somewhat convoluted for a test case, but isn't 
actually very unusual
    +    // under a real workload.  Note that we only fail the first attempt of 
stage 2, but that
    +    // could be enough to cause havoc.
    +
    +    val conf = new SparkConf().set("spark.executor.memory", "100m")
    +    val clusterSc = new SparkContext("local-cluster[10,4,100]", 
"test-cluster", conf)
    +    val bms = ArrayBuffer[BlockManagerId]()
    +    val stageFailureCount = HashMap[Int, Int]()
    +    clusterSc.addSparkListener(new SparkListener {
    +      override def onBlockManagerAdded(blockManagerAdded: 
SparkListenerBlockManagerAdded): Unit = {
    +        bms += blockManagerAdded.blockManagerId
    +      }
    +      override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
    +        if (stageCompleted.stageInfo.failureReason.isDefined) {
    +          val stage = stageCompleted.stageInfo.stageId
    +          stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) 
+ 1
    +        }
    +      }
    +    })
    +    try {
    +      val rawData = clusterSc.parallelize(1 to 1e6.toInt, 500).map{x => (x 
% 100) -> x}.cache()
    +      rawData.count()
    +      val aBm = bms(0)
    +      val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex{ case 
(idx, itr) =>
    +        // we want one failure quickly, and more failures after stage 0 
has finished its
    +        // second attempt
    +        if (TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId 
== 0) {
    +          if (idx == 0) {
    +            throw new FetchFailedException(aBm, 0, 0, idx, cause = new 
RuntimeException("simulated fetch failure"))
    +          } else if (idx > 0 && math.random < 0.1) {
    +            Thread.sleep(10000)
    +            throw new FetchFailedException(aBm, 0, 0, idx, cause = new 
RuntimeException("simulated fetch failure"))
    +          }
    +        } else {
    +          Thread.sleep(10000)
    +        }
    +        Thread.sleep(500) // want to make sure plenty of these finish 
after task 0 fails
    +        itr.map{x => ((x._1 + 5) % 100) -> x._2 }
    +      }
    +      val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map{k -> 
_}}.groupByKey(100)
    +      val data = shuffledAgain.mapPartitions { itr =>
    +        Thread.sleep(10000)
    +        itr.flatMap{_._2}
    +      }.cache().collect()
    +      val count = data.size
    +      assert(count === 1e6.toInt)
    +      assert(data.toSet === (1 to 1e6.toInt).toSet)
    +      // we should only get one failure from stage 2, everything else 
should be fine
    +      assert(stageFailureCount(2) === 1)
    +      assert(stageFailureCount.getOrElse(1, 0) === 0)
    +      assert(stageFailureCount.getOrElse(3, 0) <= 2)  // TODO this should 
be 0, bug still exists
    --- End diff --
    
    really, stage 3 should have 0 failures as well, I still need to solve that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to