[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-11 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1133064529


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 }
   }
 
+  test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the 
limitation") {
+setupStageAbortTest(sc)
+doAnswer(_ => 1).when(scheduler).maxStageAttempts

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-11 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1133064501


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 }
   }
 
+  test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the 
limitation") {
+setupStageAbortTest(sc)
+doAnswer(_ => 1).when(scheduler).maxStageAttempts
+
+val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(1))
+val shuffleId = shuffleDep.shuffleId
+val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
+submit(reduceRdd, Array(0))
+
+// Stage 0 got scheduled with 2 tasks.
+assert(taskSets.size === 1 && taskSets(0).tasks.length === 2)
+val stage0 = scheduler.stageIdToStage(0)
+
+// Task 0 of stage 0 finished successfully on hostA and then executor on 
hostA got killed and
+// shuffle data got lost. Then task 1 of stage 1 finished successfully on 
hostB.  Stage 0 will
+// be resubmitted due to shuffle data lost.

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-07 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1128798692


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4572,6 +4572,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 }
   }
 
+  test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the 
limitation") {
+setupStageAbortTest(sc)
+doAnswer(_ => 1).when(scheduler).maxStageAttempts

Review Comment:
   Thanks, will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-06 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1126637185


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -232,6 +232,13 @@ private[spark] class DAGScheduler(
 sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
   DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 
+  /**
+   * Max stage attempts allowed before a stage is aborted.
+   */
+  private[scheduler] val maxStageAttempts: Int = {
+Math.max(maxConsecutiveStageAttempts, 
sc.getConf.get(config.STAGE_MAX_ATTEMPTS))

Review Comment:
   Thanks, modified the default value instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-06 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1126636319


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2479,4 +2479,14 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val STAGE_MAX_ATTEMPTS =
+ConfigBuilder("spark.stage.maxAttempts")
+  .doc("The max attempts for a stage, the spark job will be aborted if any 
of its stages is " +
+"resubmitted multiple times beyond the limitation. The value should be 
no less " +
+"than `spark.stage.maxConsecutiveAttempts` which defines the max 
attempts for " +
+"fetch failures.")
+  .version("3.5.0")
+  .intConf
+  .createWithDefault(16)

Review Comment:
   Sounds good. Modified the default value to int max. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-06 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1126633329


##
core/src/main/scala/org/apache/spark/scheduler/Stage.scala:
##
@@ -70,6 +70,7 @@ private[scheduler] abstract class Stage(
 
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0
+  private[scheduler] def getNextAttemptId(): Int = nextAttemptId

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-06 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1126616705


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -406,12 +406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf))
 doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any())
 scheduler = new MyDAGScheduler(
+scheduler = spy(new MyDAGScheduler(

Review Comment:
   My bad...there was a commit fixing the complilation failure not pushed. 
Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40286: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

2023-03-06 Thread via GitHub


ivoson commented on code in PR #40286:
URL: https://github.com/apache/spark/pull/40286#discussion_r1126616705


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -406,12 +406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf))
 doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any())
 scheduler = new MyDAGScheduler(
+scheduler = spy(new MyDAGScheduler(

Review Comment:
   My bad...there was a commit fixing the issue not pushed. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org