mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1448069167
##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -643,6 +657,29 @@ class SparkListenerSuite extends SparkFunSuite with
LocalSparkContext with Match
}
}
+ /**
+ * A simple listener that tracks task infos for all active tasks.
+ */
+ private class SaveActiveTaskInfos extends SparkListener {
+ // Use a set based on IdentityHashMap instead of a HashSet to track unique
references of
+ // TaskInfo objects.
+ val taskInfos = Collections.newSetFromMap[TaskInfo](new IdentityHashMap)
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ val info = taskStart.taskInfo
+ if (info != null) {
+ taskInfos.add(info)
+ }
+ }
+
+ override def onTaskEnd(task: SparkListenerTaskEnd): Unit = {
+ val info = task.taskInfo
+ if (info != null && taskInfos.contains(info)) {
+ taskInfos.remove(info)
+ }
Review Comment:
nit:
```suggestion
if (info != null) {
taskInfos.remove(info)
}
```
or even simply
```suggestion
taskInfos.remove(info)
```
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler:
FakeTaskScheduler)
accumUpdates: Seq[AccumulatorV2[_, _]],
metricPeaks: Array[Long],
taskInfo: TaskInfo): Unit = {
+ accumUpdates.foreach(acc =>
+ taskInfo.setAccumulables(
+ acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+ )
+ taskScheduler.endedTasks(taskInfo.index) = reason
Review Comment:
Duplicate ?
```suggestion
```
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler:
FakeTaskScheduler)
accumUpdates: Seq[AccumulatorV2[_, _]],
metricPeaks: Array[Long],
taskInfo: TaskInfo): Unit = {
+ accumUpdates.foreach(acc =>
+ taskInfo.setAccumulables(
+ acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+ )
Review Comment:
Add a comment on why we need this ?
##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with
LocalSparkContext with Match
stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
}
+ test("SPARK-46383: Track TaskInfo objects") {
+ val conf = new
SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+ sc = new SparkContext("local", "SparkListenerSuite", conf)
+ val listener = new SaveActiveTaskInfos
+ sc.addSparkListener(listener)
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+ sc.listenerBus.waitUntilEmpty()
+ listener.taskInfos.size should be { 0 }
Review Comment:
Functionally that (the right task info is in the event) should be covered
already (in use of `SaveStageAndTaskInfo` for example). Do let me know if that
is not the case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]