[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1133064529 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } + test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") { +setupStageAbortTest(sc) +doAnswer(_ => 1).when(scheduler).maxStageAttempts Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1133064501 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } + test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") { +setupStageAbortTest(sc) +doAnswer(_ => 1).when(scheduler).maxStageAttempts + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) +submit(reduceRdd, Array(0)) + +// Stage 0 got scheduled with 2 tasks. +assert(taskSets.size === 1 && taskSets(0).tasks.length === 2) +val stage0 = scheduler.stageIdToStage(0) + +// Task 0 of stage 0 finished successfully on hostA and then executor on hostA got killed and +// shuffle data got lost. Then task 1 of stage 1 finished successfully on hostB. Stage 0 will +// be resubmitted due to shuffle data lost. Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1128798692 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } + test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") { +setupStageAbortTest(sc) +doAnswer(_ => 1).when(scheduler).maxStageAttempts Review Comment: Thanks, will make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1126637185 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -232,6 +232,13 @@ private[spark] class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Max stage attempts allowed before a stage is aborted. + */ + private[scheduler] val maxStageAttempts: Int = { +Math.max(maxConsecutiveStageAttempts, sc.getConf.get(config.STAGE_MAX_ATTEMPTS)) Review Comment: Thanks, modified the default value instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1126636319 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2479,4 +2479,14 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val STAGE_MAX_ATTEMPTS = +ConfigBuilder("spark.stage.maxAttempts") + .doc("The max attempts for a stage, the spark job will be aborted if any of its stages is " + +"resubmitted multiple times beyond the limitation. The value should be no less " + +"than `spark.stage.maxConsecutiveAttempts` which defines the max attempts for " + +"fetch failures.") + .version("3.5.0") + .intConf + .createWithDefault(16) Review Comment: Sounds good. Modified the default value to int max. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1126633329 ## core/src/main/scala/org/apache/spark/scheduler/Stage.scala: ## @@ -70,6 +70,7 @@ private[scheduler] abstract class Stage( /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 + private[scheduler] def getNextAttemptId(): Int = nextAttemptId Review Comment: Thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1126616705 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -406,12 +406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf)) doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any()) scheduler = new MyDAGScheduler( +scheduler = spy(new MyDAGScheduler( Review Comment: My bad...there was a commit fixing the complilation failure not pushed. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
ivoson commented on code in PR #40286: URL: https://github.com/apache/spark/pull/40286#discussion_r1126616705 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -406,12 +406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf)) doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any()) scheduler = new MyDAGScheduler( +scheduler = spy(new MyDAGScheduler( Review Comment: My bad...there was a commit fixing the issue not pushed. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org