Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/18008#discussion_r116876355
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
---
@@ -200,32 +202,47 @@ class TaskMetrics private[spark] () extends
Serializable {
import InternalAccumulator._
- @transient private[spark] lazy val nameToAccums = LinkedHashMap(
- EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
- EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
- EXECUTOR_RUN_TIME -> _executorRunTime,
- EXECUTOR_CPU_TIME -> _executorCpuTime,
- RESULT_SIZE -> _resultSize,
- JVM_GC_TIME -> _jvmGCTime,
- RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
- MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
- DISK_BYTES_SPILLED -> _diskBytesSpilled,
- PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
- UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
- shuffleRead.REMOTE_BLOCKS_FETCHED ->
shuffleReadMetrics._remoteBlocksFetched,
- shuffleRead.LOCAL_BLOCKS_FETCHED ->
shuffleReadMetrics._localBlocksFetched,
- shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
- shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
- shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
- shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
- shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
- shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
- shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
- input.BYTES_READ -> inputMetrics._bytesRead,
- input.RECORDS_READ -> inputMetrics._recordsRead,
- output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
- output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
- ) ++ testAccum.map(TEST_ACCUM -> _)
+ @transient private[spark] lazy val nameToAccums = {
+ // The construction of this map is a performance hotspot in the
JobProgressListener, so we
+ // optimize this by using a pre-sized Java hashmap; see SPARK-20776
for more details.
+ val mapEntries = Array[(String, AccumulatorV2[_, _])](
+ EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
+ EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
+ EXECUTOR_RUN_TIME -> _executorRunTime,
+ EXECUTOR_CPU_TIME -> _executorCpuTime,
+ RESULT_SIZE -> _resultSize,
+ JVM_GC_TIME -> _jvmGCTime,
+ RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
+ MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
+ DISK_BYTES_SPILLED -> _diskBytesSpilled,
+ PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
+ UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
+ shuffleRead.REMOTE_BLOCKS_FETCHED ->
shuffleReadMetrics._remoteBlocksFetched,
+ shuffleRead.LOCAL_BLOCKS_FETCHED ->
shuffleReadMetrics._localBlocksFetched,
+ shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
+ shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
+ shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
+ shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
+ shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
+ shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
+ shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
+ input.BYTES_READ -> inputMetrics._bytesRead,
+ input.RECORDS_READ -> inputMetrics._recordsRead,
+ output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
+ output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
+ )
+ val map = Maps.newHashMapWithExpectedSize[String, AccumulatorV2[_,
_]](mapEntries.length)
+ var i = 0
+ while (i < mapEntries.length) {
+ val e = mapEntries(i)
+ map.put(e._1, e._2)
+ i += 1
+ }
+ testAccum.foreach { accum =>
+ map.put(TEST_ACCUM, accum)
+ }
+ map.asScala
--- End diff --
The map + wrapper might consume a little bit of extra memory compared to
the old code but it doesn't matter because we don't have that many
`TaskMetrics` resident in the JVM at the same time: in the executor, the only
instances are in TaskContexts and in the driver you only have one per stage in
the scheduler and some temporary ones in the listener bus queue which are freed
as soon as the queue events are processed (which happens faster now,
outweighing the extra space usage).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]