Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160574781
  
    --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
    @@ -141,96 +145,95 @@ private class LiveTask(
             metrics.memoryBytesSpilled,
             metrics.diskBytesSpilled,
             metrics.peakExecutionMemory,
    -        new v1.InputMetrics(
    -          metrics.inputMetrics.bytesRead,
    -          metrics.inputMetrics.recordsRead),
    -        new v1.OutputMetrics(
    -          metrics.outputMetrics.bytesWritten,
    -          metrics.outputMetrics.recordsWritten),
    -        new v1.ShuffleReadMetrics(
    -          metrics.shuffleReadMetrics.remoteBlocksFetched,
    -          metrics.shuffleReadMetrics.localBlocksFetched,
    -          metrics.shuffleReadMetrics.fetchWaitTime,
    -          metrics.shuffleReadMetrics.remoteBytesRead,
    -          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    -          metrics.shuffleReadMetrics.localBytesRead,
    -          metrics.shuffleReadMetrics.recordsRead),
    -        new v1.ShuffleWriteMetrics(
    -          metrics.shuffleWriteMetrics.bytesWritten,
    -          metrics.shuffleWriteMetrics.writeTime,
    -          metrics.shuffleWriteMetrics.recordsWritten))
    -      if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
    +        metrics.inputMetrics.bytesRead,
    +        metrics.inputMetrics.recordsRead,
    +        metrics.outputMetrics.bytesWritten,
    +        metrics.outputMetrics.recordsWritten,
    +        metrics.shuffleReadMetrics.remoteBlocksFetched,
    +        metrics.shuffleReadMetrics.localBlocksFetched,
    +        metrics.shuffleReadMetrics.fetchWaitTime,
    +        metrics.shuffleReadMetrics.remoteBytesRead,
    +        metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    +        metrics.shuffleReadMetrics.localBytesRead,
    +        metrics.shuffleReadMetrics.recordsRead,
    +        metrics.shuffleWriteMetrics.bytesWritten,
    +        metrics.shuffleWriteMetrics.writeTime,
    +        metrics.shuffleWriteMetrics.recordsWritten)
    +
    +      this.metrics = newMetrics
    +
    +      // Only calculate the delta if the old metrics contain valid 
information, otherwise
    +      // the new metrics are the delta.
    +      if (old.executorDeserializeTime >= 0L) {
    +        newMetrics.subtract(old)
    +      } else {
    +        newMetrics
    +      }
         } else {
           null
         }
       }
     
    -  /**
    -   * Return a new TaskMetrics object containing the delta of the various 
fields of the given
    -   * metrics objects. This is currently targeted at updating stage data, 
so it does not
    -   * necessarily calculate deltas for all the fields.
    -   */
    -  private def calculateMetricsDelta(
    -      metrics: v1.TaskMetrics,
    -      old: v1.TaskMetrics): v1.TaskMetrics = {
    -    val shuffleWriteDelta = new v1.ShuffleWriteMetrics(
    -      metrics.shuffleWriteMetrics.bytesWritten - 
old.shuffleWriteMetrics.bytesWritten,
    -      0L,
    -      metrics.shuffleWriteMetrics.recordsWritten - 
old.shuffleWriteMetrics.recordsWritten)
    -
    -    val shuffleReadDelta = new v1.ShuffleReadMetrics(
    -      0L, 0L, 0L,
    -      metrics.shuffleReadMetrics.remoteBytesRead - 
old.shuffleReadMetrics.remoteBytesRead,
    -      metrics.shuffleReadMetrics.remoteBytesReadToDisk -
    -        old.shuffleReadMetrics.remoteBytesReadToDisk,
    -      metrics.shuffleReadMetrics.localBytesRead - 
old.shuffleReadMetrics.localBytesRead,
    -      metrics.shuffleReadMetrics.recordsRead - 
old.shuffleReadMetrics.recordsRead)
    -
    -    val inputDelta = new v1.InputMetrics(
    -      metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead,
    -      metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead)
    -
    -    val outputDelta = new v1.OutputMetrics(
    -      metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten,
    -      metrics.outputMetrics.recordsWritten - 
old.outputMetrics.recordsWritten)
    -
    -    new v1.TaskMetrics(
    -      0L, 0L,
    -      metrics.executorRunTime - old.executorRunTime,
    -      metrics.executorCpuTime - old.executorCpuTime,
    -      0L, 0L, 0L,
    -      metrics.memoryBytesSpilled - old.memoryBytesSpilled,
    -      metrics.diskBytesSpilled - old.diskBytesSpilled,
    -      0L,
    -      inputDelta,
    -      outputDelta,
    -      shuffleReadDelta,
    -      shuffleWriteDelta)
    -  }
    -
    -  override protected def doUpdate(): Any = {
    +  private def buildUpdate(): TaskDataWrapper = {
         val duration = if (info.finished) {
           info.duration
         } else {
           
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
         }
     
    -    val task = new v1.TaskData(
    +    new TaskDataWrapper(
           info.taskId,
           info.index,
           info.attemptNumber,
    -      new Date(info.launchTime),
    -      if (info.gettingResult) Some(new Date(info.gettingResultTime)) else 
None,
    -      Some(duration),
    -      info.executorId,
    -      info.host,
    -      info.status,
    -      info.taskLocality.toString(),
    +      info.launchTime,
    +      if (info.gettingResult) info.gettingResultTime else -1L,
    +      duration,
    +      weakIntern(info.executorId),
    +      weakIntern(info.host),
    +      weakIntern(info.status),
    +      weakIntern(info.taskLocality.toString()),
           info.speculative,
           newAccumulatorInfos(info.accumulables),
           errorMessage,
    -      Option(recordedMetrics))
    -    new TaskDataWrapper(task, stageId, stageAttemptId)
    +
    +      metrics.executorDeserializeTime,
    +      metrics.executorDeserializeCpuTime,
    +      metrics.executorRunTime,
    +      metrics.executorCpuTime,
    +      metrics.resultSize,
    +      metrics.jvmGcTime,
    +      metrics.resultSerializationTime,
    +      metrics.memoryBytesSpilled,
    +      metrics.diskBytesSpilled,
    +      metrics.peakExecutionMemory,
    +      metrics.inputBytesRead,
    +      metrics.inputRecordsRead,
    +      metrics.outputBytesWritten,
    +      metrics.outputRecordsWritten,
    +      metrics.shuffleRemoteBlocksFetched,
    +      metrics.shuffleLocalBlocksFetched,
    +      metrics.shuffleFetchWaitTime,
    +      metrics.shuffleRemoteBytesRead,
    +      metrics.shuffleRemoteBytesReadToDisk,
    +      metrics.shuffleLocalBytesRead,
    +      metrics.shuffleRecordsRead,
    +      metrics.shuffleBytesWritten,
    +      metrics.shuffleWriteTime,
    +      metrics.shuffleRecordsWritten,
    +
    +      stageId,
    +      stageAttemptId)
    +  }
    +
    +  override protected def doUpdate(): Any = {
    +    buildUpdate()
    +  }
    +
    +  def updateAndGet(kvstore: KVStore, now: Long): TaskDataWrapper = {
    --- End diff --
    
    why do we need it? Since the caller side doesn't care about the return 
value, we can just update the `lastWriteTime` in `doUpdate`, and then call 
`doUpdate` instead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to