Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18008#discussion_r116876355
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
    @@ -200,32 +202,47 @@ class TaskMetrics private[spark] () extends 
Serializable {
     
     
       import InternalAccumulator._
    -  @transient private[spark] lazy val nameToAccums = LinkedHashMap(
    -    EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
    -    EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
    -    EXECUTOR_RUN_TIME -> _executorRunTime,
    -    EXECUTOR_CPU_TIME -> _executorCpuTime,
    -    RESULT_SIZE -> _resultSize,
    -    JVM_GC_TIME -> _jvmGCTime,
    -    RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
    -    MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
    -    DISK_BYTES_SPILLED -> _diskBytesSpilled,
    -    PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
    -    UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
    -    shuffleRead.REMOTE_BLOCKS_FETCHED -> 
shuffleReadMetrics._remoteBlocksFetched,
    -    shuffleRead.LOCAL_BLOCKS_FETCHED -> 
shuffleReadMetrics._localBlocksFetched,
    -    shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
    -    shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
    -    shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
    -    shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
    -    shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
    -    shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
    -    shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
    -    input.BYTES_READ -> inputMetrics._bytesRead,
    -    input.RECORDS_READ -> inputMetrics._recordsRead,
    -    output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
    -    output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
    -  ) ++ testAccum.map(TEST_ACCUM -> _)
    +  @transient private[spark] lazy val nameToAccums = {
    +    // The construction of this map is a performance hotspot in the 
JobProgressListener, so we
    +    // optimize this by using a pre-sized Java hashmap; see SPARK-20776 
for more details.
    +    val mapEntries = Array[(String, AccumulatorV2[_, _])](
    +      EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
    +      EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
    +      EXECUTOR_RUN_TIME -> _executorRunTime,
    +      EXECUTOR_CPU_TIME -> _executorCpuTime,
    +      RESULT_SIZE -> _resultSize,
    +      JVM_GC_TIME -> _jvmGCTime,
    +      RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
    +      MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
    +      DISK_BYTES_SPILLED -> _diskBytesSpilled,
    +      PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
    +      UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
    +      shuffleRead.REMOTE_BLOCKS_FETCHED -> 
shuffleReadMetrics._remoteBlocksFetched,
    +      shuffleRead.LOCAL_BLOCKS_FETCHED -> 
shuffleReadMetrics._localBlocksFetched,
    +      shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
    +      shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
    +      shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
    +      shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
    +      shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
    +      shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
    +      shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
    +      input.BYTES_READ -> inputMetrics._bytesRead,
    +      input.RECORDS_READ -> inputMetrics._recordsRead,
    +      output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
    +      output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
    +    )
    +    val map = Maps.newHashMapWithExpectedSize[String, AccumulatorV2[_, 
_]](mapEntries.length)
    +    var i = 0
    +    while (i < mapEntries.length) {
    +      val e = mapEntries(i)
    +      map.put(e._1, e._2)
    +      i += 1
    +    }
    +    testAccum.foreach { accum =>
    +      map.put(TEST_ACCUM, accum)
    +    }
    +    map.asScala
    --- End diff --
    
    The map + wrapper might consume a little bit of extra memory compared to 
the old code but it doesn't matter because we don't have that many 
`TaskMetrics` resident in the JVM at the same time: in the executor, the only 
instances are in TaskContexts and in the driver you only have one per stage in 
the scheduler and some temporary ones in the listener bus queue which are freed 
as soon as the queue events are processed (which happens faster now, 
outweighing the extra space usage).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to