Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22473#discussion_r219575442
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -799,15 +799,21 @@ private[spark] class Executor(
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime -
taskRunner.startGCTime)
- accumUpdates += ((taskRunner.taskId,
taskRunner.task.metrics.accumulators()))
+ val accumulatorsToReport =
+ if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key,
true)) {
+ taskRunner.task.metrics.accumulators().filterNot(_.isZero)
+ } else {
+ taskRunner.task.metrics.accumulators()
+ }
+ accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
}
}
val message = Heartbeat(executorId, accumUpdates.toArray,
env.blockManager.blockManagerId,
executorUpdates)
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
- message, RpcTimeout(conf, "spark.executor.heartbeatInterval",
"10s"))
+ message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key,
"10s"))
--- End diff --
Could you add a new `apply` method to `object RpcTimeout` to support
`ConfigEntry`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]