[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20082 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r159247576 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -79,6 +79,7 @@ private[spark] abstract class Task[T]( SparkEnv.get.blockManager.registerTask(taskAttemptId) context = new TaskContextImpl( stageId, + stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal --- End diff -- ah, so `stageAttemptId` is already exposed in developer API, we can't change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r159227953 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -79,6 +79,7 @@ private[spark] abstract class Task[T]( SparkEnv.get.blockManager.registerTask(taskAttemptId) context = new TaskContextImpl( stageId, + stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal --- End diff -- The modification may not be too much (100+ occurrences in 20+ files), however it may break eventLog's JsonProtocol backward compatibility(not sure).. @squito you may have more knowledge on this since you introduced `stageAttemptId`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r159175320 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -79,6 +79,7 @@ private[spark] abstract class Task[T]( SparkEnv.get.blockManager.registerTask(taskAttemptId) context = new TaskContextImpl( stageId, + stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal --- End diff -- How much work we need to rename the internal `stageAttemptId` to `stageAttemptNumber`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r159038326 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,13 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. It represents how many + * times the stage has been attempted. The first stage attempt will be assigned stageAttemptId = 0 + * , and subsequent attempts will increasing stageAttemptId one by one. + */ + def stageAttemptId(): Int --- End diff -- I have no objection for either âidâ nor ânumberâ, they are both reasonable. I am on train nowã If no other input, I can rename it to âstageAttemptNumberâ since you insisted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r159033482 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,13 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. It represents how many + * times the stage has been attempted. The first stage attempt will be assigned stageAttemptId = 0 + * , and subsequent attempts will increasing stageAttemptId one by one. + */ + def stageAttemptId(): Int --- End diff -- My concern is that, internally we use `stageAttemptId`, and internally we call `TaskContext.taskAttemptId` `taskId`. However, for end users, they don't know the internal code, and they are more familiar with `TaskContext`. I think the naming should be consistent with the public API `TaskContext`, instead of internal code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158898080 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. + */ + def stageAttemptId(): Int --- End diff -- Yeah, if we are defining `stageAttemptId` from scratch, I would go for `stageAttemptNumber`. However `stageAttemptId` are already used elsewhere in the codebase, Like in [Task.scala](https://github.com/apache/spark/blob/ded6d27e4eb02e4530015a95794e6ed0586faaa7/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L55). I think it's more important to be consistent. However I could update the comment to reflect the attempt number part if you wish --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897767 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- OK then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897401 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. + */ + def stageAttemptId(): Int --- End diff -- I think we should call it `stageAttempNumber` to be consistent with `taskAttemptNumber`. Also let's follow the comment of `attemptNumber` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897297 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- it's kind of a code style standard: add `override` if it is override. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158894808 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- Will do. Would you tell me the difference or rationale? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158855520 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- nit: add `override`. Since you are touching this file, could you also add `override` to `stageId` and `partitionId`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158774364 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { +throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158774143 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { +throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- Please add comment to explain that `FetchFailedException` will trigger a new stage attempt, while a common `Exception` will only trigger a task retry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158773971 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { +throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- oh, right~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158773445 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { +throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- Related to repartition part. I use FetchFailedException to explicitly trigger a stage resubmission. Otherwise, the task would be resubmitted in the same stage if IIRC. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158772547 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { +throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- Emmm... just throw an `Exception` is enough here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158772359 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { +sc = new SparkContext("local[1,2]", "test") + +// Check stage attemptIds are 0 for initial stage +val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator +}.collect() +assert(stageAttemptIds.toSet === Set(0)) + +// Check stage attemptIds that are resubmitted when task fails +val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => --- End diff -- You don't need `repartition` here, just `sc.parallelize(Seq(1, 2, 3, 4), 1).mapPartitions {...}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158752147 --- Diff: project/MimaExcludes.scala --- @@ -95,7 +95,10 @@ object MimaExcludes { // [SPARK-21087] CrossValidator, TrainValidationSplit expose sub models after fitting: Scala ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelWriter"), - ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter") + ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"), + +// [SPARK-22897] Expose stageAttemptId in TaskContext + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptId") --- End diff -- can you put it at the beginning of `v23excludes`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158721239 --- Diff: project/MimaExcludes.scala --- @@ -95,7 +95,10 @@ object MimaExcludes { // [SPARK-21087] CrossValidator, TrainValidationSplit expose sub models after fitting: Scala ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelWriter"), - ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter") + ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"), + +// [SPARK-22897] Expose stageAttemptId in TaskContext + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptId") --- End diff -- This change is suggested by https://github.com/apache/spark/pull/12248/ If not appropriate, please let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158699877 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,10 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * The attempt ID of the stage that this task belongs to. --- End diff -- Of course, I can add more doc. But is it necessary? I think the field(function) name is already self-explanatory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158699180 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,10 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * The attempt ID of the stage that this task belongs to. --- End diff -- can we follow `taskAttemptId` and say more in the document? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
GitHub user advancedxy opened a pull request: https://github.com/apache/spark/pull/20082 [SPARK-22897][CORE]: Expose stageAttemptId in TaskContext ## What changes were proposed in this pull request? stageAttemptId added in TaskContext and corresponding construction modification ## How was this patch tested? Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) You can merge this pull request into a Git repository by running: $ git pull https://github.com/advancedxy/spark SPARK-22897 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20082.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20082 commit 5753ee0d938b507c61c19233f87043396befadc5 Author: Xianjin YEDate: 2017-12-26T11:51:44Z [SPARK-22897][CORE]: Expose stageAttemptId in TaskContext --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org