Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160478184
  
    --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
    @@ -119,118 +121,115 @@ private class LiveTask(
     
       import LiveEntityHelpers._
     
    -  private var recordedMetrics: v1.TaskMetrics = null
    +  private var metrics: MetricsTracker = new MetricsTracker()
     
       var errorMessage: Option[String] = None
     
       /**
        * Update the metrics for the task and return the difference between the 
previous and new
        * values.
        */
    -  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
    +  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
         if (metrics != null) {
    -      val old = recordedMetrics
    -      recordedMetrics = new v1.TaskMetrics(
    -        metrics.executorDeserializeTime,
    -        metrics.executorDeserializeCpuTime,
    -        metrics.executorRunTime,
    -        metrics.executorCpuTime,
    -        metrics.resultSize,
    -        metrics.jvmGCTime,
    -        metrics.resultSerializationTime,
    -        metrics.memoryBytesSpilled,
    -        metrics.diskBytesSpilled,
    -        metrics.peakExecutionMemory,
    -        new v1.InputMetrics(
    -          metrics.inputMetrics.bytesRead,
    -          metrics.inputMetrics.recordsRead),
    -        new v1.OutputMetrics(
    -          metrics.outputMetrics.bytesWritten,
    -          metrics.outputMetrics.recordsWritten),
    -        new v1.ShuffleReadMetrics(
    -          metrics.shuffleReadMetrics.remoteBlocksFetched,
    -          metrics.shuffleReadMetrics.localBlocksFetched,
    -          metrics.shuffleReadMetrics.fetchWaitTime,
    -          metrics.shuffleReadMetrics.remoteBytesRead,
    -          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    -          metrics.shuffleReadMetrics.localBytesRead,
    -          metrics.shuffleReadMetrics.recordsRead),
    -        new v1.ShuffleWriteMetrics(
    -          metrics.shuffleWriteMetrics.bytesWritten,
    -          metrics.shuffleWriteMetrics.writeTime,
    -          metrics.shuffleWriteMetrics.recordsWritten))
    -      if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
    +      val old = this.metrics
    +      val newMetrics = new MetricsTracker()
    +      newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
    +      newMetrics.executorDeserializeCpuTime = 
metrics.executorDeserializeCpuTime
    +      newMetrics.executorRunTime = metrics.executorRunTime
    +      newMetrics.executorCpuTime = metrics.executorCpuTime
    +      newMetrics.resultSize = metrics.resultSize
    +      newMetrics.jvmGcTime = metrics.jvmGCTime
    +      newMetrics.resultSerializationTime = metrics.resultSerializationTime
    +      newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
    +      newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
    +      newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
    +      newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
    +      newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
    +      newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
    +      newMetrics.outputRecordsWritten = 
metrics.outputMetrics.recordsWritten
    +      newMetrics.shuffleRemoteBlocksFetched = 
metrics.shuffleReadMetrics.remoteBlocksFetched
    +      newMetrics.shuffleLocalBlocksFetched = 
metrics.shuffleReadMetrics.localBlocksFetched
    +      newMetrics.shuffleFetchWaitTime = 
metrics.shuffleReadMetrics.fetchWaitTime
    +      newMetrics.shuffleRemoteBytesRead = 
metrics.shuffleReadMetrics.remoteBytesRead
    +      newMetrics.shuffleRemoteBytesReadToDisk = 
metrics.shuffleReadMetrics.remoteBytesReadToDisk
    +      newMetrics.shuffleLocalBytesRead = 
metrics.shuffleReadMetrics.localBytesRead
    +      newMetrics.shuffleRecordsRead = 
metrics.shuffleReadMetrics.recordsRead
    +      newMetrics.shuffleBytesWritten = 
metrics.shuffleWriteMetrics.bytesWritten
    +      newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
    +      newMetrics.shuffleRecordsWritten = 
metrics.shuffleWriteMetrics.recordsWritten
    +
    +      this.metrics = newMetrics
    +      if (old.executorDeserializeTime >= 0L) {
    +        old.subtract(newMetrics)
    --- End diff --
    
    No, this is right. The parameter is actually a delta; so it has to be 
applied to the old value, except when the old value is not initialized (which 
is the condition in L162).
    
    But let me double check this again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to