cloud-fan commented on a change in pull request #32786:
URL: https://github.com/apache/spark/pull/32786#discussion_r646427719



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
##########
@@ -60,16 +62,22 @@ case class CollectMetricsExec(
   override protected def doExecute(): RDD[InternalRow] = {
     val collector = accumulator
     collector.reset()
+    val updaterMap = mutable.Map[Long, AggregatingAccumulator]()
     child.execute().mapPartitions { rows =>
       // Only publish the value of the accumulator when the task has 
completed. This is done by
       // updating a task local accumulator ('updater') which will be merged 
with the actual
       // accumulator as soon as the task completes. This avoids the following 
problems during the
       // heartbeat:
       // - Correctness issues due to partially completed/visible updates.
       // - Performance issues due to excessive serialization.
-      val updater = collector.copyAndReset()
-      TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-        collector.setState(updater)

Review comment:
       If a task needs to handle more than one partitions, then it means we 
need to merge the task local accumulators before sending them back to the 
driver.
   
   It looks to me that we need a `collector.addState`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to