sarutak commented on a change in pull request #32786:
URL: https://github.com/apache/spark/pull/32786#discussion_r646504343



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
##########
@@ -60,16 +62,22 @@ case class CollectMetricsExec(
   override protected def doExecute(): RDD[InternalRow] = {
     val collector = accumulator
     collector.reset()
+    val updaterMap = mutable.Map[Long, AggregatingAccumulator]()

Review comment:
       This works. The problem happens on executor side so the map also should 
be on the executor side.
   But I've tried another approach by adding `addState`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
##########
@@ -60,16 +62,22 @@ case class CollectMetricsExec(
   override protected def doExecute(): RDD[InternalRow] = {
     val collector = accumulator
     collector.reset()
+    val updaterMap = mutable.Map[Long, AggregatingAccumulator]()
     child.execute().mapPartitions { rows =>
       // Only publish the value of the accumulator when the task has 
completed. This is done by
       // updating a task local accumulator ('updater') which will be merged 
with the actual
       // accumulator as soon as the task completes. This avoids the following 
problems during the
       // heartbeat:
       // - Correctness issues due to partially completed/visible updates.
       // - Performance issues due to excessive serialization.
-      val updater = collector.copyAndReset()
-      TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-        collector.setState(updater)

Review comment:
       I've added `addState`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to