Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/5964#discussion_r29823898
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -773,6 +774,64 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ ignore("no concurrent retries for stage attempts (SPARK-7308)") {
+ // see SPARK-7308 for a detailed description of the conditions this is
trying to recreate.
+ // note that this is somewhat convoluted for a test case, but isn't
actually very unusual
+ // under a real workload. Note that we only fail the first attempt of
stage 2, but that
+ // could be enough to cause havoc.
+
+ val conf = new SparkConf().set("spark.executor.memory", "100m")
+ val clusterSc = new SparkContext("local-cluster[10,4,100]",
"test-cluster", conf)
+ val bms = ArrayBuffer[BlockManagerId]()
+ val stageFailureCount = HashMap[Int, Int]()
+ clusterSc.addSparkListener(new SparkListener {
+ override def onBlockManagerAdded(blockManagerAdded:
SparkListenerBlockManagerAdded): Unit = {
+ bms += blockManagerAdded.blockManagerId
+ }
+ override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
+ if (stageCompleted.stageInfo.failureReason.isDefined) {
+ val stage = stageCompleted.stageInfo.stageId
+ stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0)
+ 1
+ }
+ }
+ })
+ try {
+ val rawData = clusterSc.parallelize(1 to 1e6.toInt, 500).map{x => (x
% 100) -> x}.cache()
+ rawData.count()
+ val aBm = bms(0)
+ val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex{ case
(idx, itr) =>
+ // we want one failure quickly, and more failures after stage 0
has finished its
+ // second attempt
+ if (TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId
== 0) {
+ if (idx == 0) {
+ throw new FetchFailedException(aBm, 0, 0, idx, cause = new
RuntimeException("simulated fetch failure"))
+ } else if (idx > 0 && math.random < 0.1) {
+ Thread.sleep(10000)
+ throw new FetchFailedException(aBm, 0, 0, idx, cause = new
RuntimeException("simulated fetch failure"))
+ }
+ } else {
+ Thread.sleep(10000)
+ }
+ Thread.sleep(500) // want to make sure plenty of these finish
after task 0 fails
+ itr.map{x => ((x._1 + 5) % 100) -> x._2 }
+ }
+ val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map{k ->
_}}.groupByKey(100)
+ val data = shuffledAgain.mapPartitions { itr =>
+ Thread.sleep(10000)
+ itr.flatMap{_._2}
+ }.cache().collect()
+ val count = data.size
+ assert(count === 1e6.toInt)
+ assert(data.toSet === (1 to 1e6.toInt).toSet)
+ // we should only get one failure from stage 2, everything else
should be fine
+ assert(stageFailureCount(2) === 1)
+ assert(stageFailureCount.getOrElse(1, 0) === 0)
+ assert(stageFailureCount.getOrElse(3, 0) <= 2) // TODO this should
be 0, bug still exists
--- End diff --
really, stage 3 should have 0 failures as well, I still need to solve that.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]