Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15213#discussion_r80993655
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2105,6 +2106,61 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA,
shuffleDepC))
}
+ test("SPARK-17644: After one stage is aborted for too many failed
attempts, subsequent stages" +
+ "still behave correctly on fetch failures") {
+ // Runs a job that always encounters a fetch failure, so should
eventually be aborted
+ def runJobWithPersistentFetchFailure: Unit = {
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x,
1)).groupByKey()
+ val shuffleHandle =
+ rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _,
_]].shuffleHandle
+ rdd1.map {
+ case (x, _) if (x == 1) =>
+ throw new FetchFailedException(
+ BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0,
"test")
+ case (x, _) => x
+ }.count()
+ }
+
+ // Runs a job that encounters a single fetch failure but succeeds on
the second attempt
+ def runJobWithTemporaryFetchFailure: Unit = {
+ object FailThisAttempt {
+ val _fail = new AtomicBoolean(true)
+ }
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x,
1)).groupByKey()
+ val shuffleHandle =
+ rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _,
_]].shuffleHandle
+ rdd1.map {
+ case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false)
=>
+ throw new FetchFailedException(
+ BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0,
"test")
+ }
+ }
+
+ failAfter(10.seconds) {
+ val e = intercept[SparkException] {
+ runJobWithPersistentFetchFailure
+ }
+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+ }
+
+ // Run a second job that will fail due to a fetch failure.
+ // This job will hang without the fix for SPARK-17644.
+ failAfter(10.seconds) {
+ val e = intercept[SparkException] {
+ runJobWithPersistentFetchFailure
+ }
+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+ }
+
+ failAfter(10.seconds) {
+ try {
+ runJobWithTemporaryFetchFailure
+ } catch {
+ case e: Throwable => fail("A job with one fetch failure should
eventually succeed")
--- End diff --
nit: you don't need to specifically catch an exception and call `fail`, the
test will automatically fail from an unhandled exception
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]