sarutak commented on a change in pull request #32786:
URL: https://github.com/apache/spark/pull/32786#discussion_r646504343
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
##########
@@ -60,16 +62,22 @@ case class CollectMetricsExec(
override protected def doExecute(): RDD[InternalRow] = {
val collector = accumulator
collector.reset()
+ val updaterMap = mutable.Map[Long, AggregatingAccumulator]()
Review comment:
This works. The problem happens on executor side so the map also should
be on the executor side.
But I've tried another approach by adding `addState`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
##########
@@ -60,16 +62,22 @@ case class CollectMetricsExec(
override protected def doExecute(): RDD[InternalRow] = {
val collector = accumulator
collector.reset()
+ val updaterMap = mutable.Map[Long, AggregatingAccumulator]()
child.execute().mapPartitions { rows =>
// Only publish the value of the accumulator when the task has
completed. This is done by
// updating a task local accumulator ('updater') which will be merged
with the actual
// accumulator as soon as the task completes. This avoids the following
problems during the
// heartbeat:
// - Correctness issues due to partially completed/visible updates.
// - Performance issues due to excessive serialization.
- val updater = collector.copyAndReset()
- TaskContext.get().addTaskCompletionListener[Unit] { _ =>
- collector.setState(updater)
Review comment:
I've added `addState`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]