mridulm commented on a change in pull request #34607:
URL: https://github.com/apache/spark/pull/34607#discussion_r762210622
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -600,6 +600,17 @@ private[spark] class AppStatusListener(
liveUpdate(task, now)
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach {
stage =>
+ if (event.taskInfo.speculative) {
+ stage.speculationStageSummary = Some(stage.speculationStageSummary
+ .getOrElse(new LiveSpeculationStageSummary(event.stageId,
event.stageAttemptId)))
+ val speculationSummary = stage.speculationStageSummary.get
Review comment:
if (
```suggestion
if (stage.speculationStageSummary.isEmpty) {
// initialize
}
val speculationSummary = stage.speculationStageSummary.get
```
##########
File path: core/src/main/scala/org/apache/spark/status/storeTypes.scala
##########
@@ -399,6 +399,18 @@ private[spark] class ExecutorStageSummaryWrapper(
}
+private[spark] class SpeculationStageSummaryWrapper(
+ val stageId: Int,
+ val stageAttemptId: Int,
+ val info: SpeculationStageSummary) {
+
+ @JsonIgnore @KVIndex("stage")
+ private def stage: Array[Int] = Array(stageId, stageAttemptId)
+
Review comment:
We can drop this, and only keep `id`
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -747,6 +758,24 @@ private[spark] class AppStatusListener(
maybeUpdate(esummary, now)
}
+ if (event.taskInfo.speculative) {
+ stage.speculationStageSummary =
Some(stage.speculationStageSummary.getOrElse(
+ new LiveSpeculationStageSummary(event.stageId,
event.stageAttemptId)))
+ val speculationStageSummary = stage.speculationStageSummary.get
+ speculationStageSummary.numActiveTasks -= 1
+ speculationStageSummary.numCompletedTasks += completedDelta
+ speculationStageSummary.numFailedTasks += failedDelta
+ speculationStageSummary.numKilledTasks += killedDelta
+ }
+
+ if(stage.speculationStageSummary.isDefined) {
+ if (isLastTask) {
+ update(stage.speculationStageSummary.get, now)
+ } else {
Review comment:
```suggestion
} else if (event.taskInfo.speculative) {
```
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -600,6 +600,17 @@ private[spark] class AppStatusListener(
liveUpdate(task, now)
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach {
stage =>
+ if (event.taskInfo.speculative) {
+ stage.speculationStageSummary = Some(stage.speculationStageSummary
+ .getOrElse(new LiveSpeculationStageSummary(event.stageId,
event.stageAttemptId)))
+ val speculationSummary = stage.speculationStageSummary.get
+ speculationSummary.numActiveTasks += 1
+ speculationSummary.numTasks += 1
+ }
+ if(stage.speculationStageSummary.isDefined) {
+ maybeUpdate(stage.speculationStageSummary.get, now)
Review comment:
We need to do the update only when it is speculative task start - not in
other cases.
```suggestion
maybeUpdate(stage.speculationStageSummary.get, now)
```
##########
File path: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
##########
@@ -141,6 +144,43 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}
+ test("SPARK-36038: speculation summary") {
+ val store = new InMemoryStore()
+ store.write(newSpeculationSummaryData(stageId, attemptId))
Review comment:
Keep the result of `newSpeculationStageSummary` in a variable and check
against its fields in `info.foreach` below instead of hardcoding expected
values.
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -747,6 +758,24 @@ private[spark] class AppStatusListener(
maybeUpdate(esummary, now)
}
+ if (event.taskInfo.speculative) {
+ stage.speculationStageSummary =
Some(stage.speculationStageSummary.getOrElse(
+ new LiveSpeculationStageSummary(event.stageId,
event.stageAttemptId)))
Review comment:
```suggestion
if (stage. speculationStageSummary.isEmpty) {
// initialize
// Task Start was dropped - so we are going to see inconsistent
metadata.
}
```
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -600,6 +600,17 @@ private[spark] class AppStatusListener(
liveUpdate(task, now)
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach {
stage =>
+ if (event.taskInfo.speculative) {
+ stage.speculationStageSummary = Some(stage.speculationStageSummary
+ .getOrElse(new LiveSpeculationStageSummary(event.stageId,
event.stageAttemptId)))
+ val speculationSummary = stage.speculationStageSummary.get
+ speculationSummary.numActiveTasks += 1
+ speculationSummary.numTasks += 1
+ }
+ if(stage.speculationStageSummary.isDefined) {
+ maybeUpdate(stage.speculationStageSummary.get, now)
Review comment:
Why are we special casing speculative execution summary compared to
other updates (to stage, executor, etc).
With the change below (`maybeUpdate` only when task start is for speculative
task) we should be fine ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]