Github user carsonwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7399#discussion_r34865990
  
    --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
    @@ -672,164 +658,605 @@ private[ui] class StagePage(parent: StagesTab) 
extends WebUIPage("stage") {
         </script>
       }
     
    -  def taskRow(
    -      hasAccumulators: Boolean,
    -      hasInput: Boolean,
    -      hasOutput: Boolean,
    -      hasShuffleRead: Boolean,
    -      hasShuffleWrite: Boolean,
    -      hasBytesSpilled: Boolean,
    -      currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
    -    taskData match { case TaskUIData(info, metrics, errorMessage) =>
    -      val duration = if (info.status == "RUNNING") 
info.timeRunning(currentTime)
    -        else metrics.map(_.executorRunTime).getOrElse(1L)
    -      val formatDuration = if (info.status == "RUNNING") 
UIUtils.formatDuration(duration)
    -        else metrics.map(m => 
UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
    -      val schedulerDelay = metrics.map(getSchedulerDelay(info, _, 
currentTime)).getOrElse(0L)
    -      val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
    -      val taskDeserializationTime = 
metrics.map(_.executorDeserializeTime).getOrElse(0L)
    -      val serializationTime = 
metrics.map(_.resultSerializationTime).getOrElse(0L)
    -      val gettingResultTime = getGettingResultTime(info, currentTime)
    -
    -      val maybeAccumulators = info.accumulables
    -      val accumulatorsReadable = maybeAccumulators.map { acc =>
    -        StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
    +}
    +
    +private[ui] object StagePage {
    +  private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): 
Long = {
    +    if (info.gettingResult) {
    +      if (info.finished) {
    +        info.finishTime - info.gettingResultTime
    +      } else {
    +        // The task is still fetching the result.
    +        currentTime - info.gettingResultTime
           }
    +    } else {
    +      0L
    +    }
    +  }
     
    -      val maybeInput = metrics.flatMap(_.inputMetrics)
    -      val inputSortable = 
maybeInput.map(_.bytesRead.toString).getOrElse("")
    -      val inputReadable = maybeInput
    -        .map(m => s"${Utils.bytesToString(m.bytesRead)} 
(${m.readMethod.toString.toLowerCase()})")
    -        .getOrElse("")
    -      val inputRecords = 
maybeInput.map(_.recordsRead.toString).getOrElse("")
    -
    -      val maybeOutput = metrics.flatMap(_.outputMetrics)
    -      val outputSortable = 
maybeOutput.map(_.bytesWritten.toString).getOrElse("")
    -      val outputReadable = maybeOutput
    -        .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
    -        .getOrElse("")
    -      val outputRecords = 
maybeOutput.map(_.recordsWritten.toString).getOrElse("")
    -
    -      val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
    -      val shuffleReadBlockedTimeSortable = maybeShuffleRead
    -        .map(_.fetchWaitTime.toString).getOrElse("")
    -      val shuffleReadBlockedTimeReadable =
    -        maybeShuffleRead.map(ms => 
UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
    -
    -      val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
    -      val shuffleReadSortable = 
totalShuffleBytes.map(_.toString).getOrElse("")
    -      val shuffleReadReadable = 
totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
    -      val shuffleReadRecords = 
maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
    -
    -      val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
    -      val shuffleReadRemoteSortable = 
remoteShuffleBytes.map(_.toString).getOrElse("")
    -      val shuffleReadRemoteReadable = 
remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
    -
    -      val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
    -      val shuffleWriteSortable = 
maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
    -      val shuffleWriteReadable = maybeShuffleWrite
    -        .map(m => 
s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
    -      val shuffleWriteRecords = maybeShuffleWrite
    -        .map(_.shuffleRecordsWritten.toString).getOrElse("")
    -
    -      val maybeWriteTime = 
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
    -      val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
    -      val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 
1000)).map { ms =>
    -        if (ms == 0) "" else UIUtils.formatDuration(ms)
    -      }.getOrElse("")
    -
    -      val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
    -      val memoryBytesSpilledSortable = 
maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
    -      val memoryBytesSpilledReadable =
    -        maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
    -
    -      val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
    -      val diskBytesSpilledSortable = 
maybeDiskBytesSpilled.map(_.toString).getOrElse("")
    -      val diskBytesSpilledReadable = 
maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
    -
    -      <tr>
    -        <td>{info.index}</td>
    -        <td>{info.taskId}</td>
    -        <td sorttable_customkey={info.attempt.toString}>{
    -          if (info.speculative) s"${info.attempt} (speculative)" else 
info.attempt.toString
    -        }</td>
    -        <td>{info.status}</td>
    -        <td>{info.taskLocality}</td>
    -        <td>{info.executorId} / {info.host}</td>
    -        <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
    -        <td sorttable_customkey={duration.toString}>
    -          {formatDuration}
    -        </td>
    -        <td sorttable_customkey={schedulerDelay.toString}
    -            class={TaskDetailsClassNames.SCHEDULER_DELAY}>
    -          {UIUtils.formatDuration(schedulerDelay.toLong)}
    -        </td>
    -        <td sorttable_customkey={taskDeserializationTime.toString}
    -            class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
    -          {UIUtils.formatDuration(taskDeserializationTime.toLong)}
    -        </td>
    -        <td sorttable_customkey={gcTime.toString}>
    -          {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
    -        </td>
    -        <td sorttable_customkey={serializationTime.toString}
    -            class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
    -          {UIUtils.formatDuration(serializationTime)}
    -        </td>
    -        <td sorttable_customkey={gettingResultTime.toString}
    -            class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
    -          {UIUtils.formatDuration(gettingResultTime)}
    -        </td>
    -        {if (hasAccumulators) {
    -          <td>
    -            {Unparsed(accumulatorsReadable.mkString("<br/>"))}
    -          </td>
    -        }}
    -        {if (hasInput) {
    -          <td sorttable_customkey={inputSortable}>
    -            {s"$inputReadable / $inputRecords"}
    -          </td>
    -        }}
    -        {if (hasOutput) {
    -          <td sorttable_customkey={outputSortable}>
    -            {s"$outputReadable / $outputRecords"}
    -          </td>
    -        }}
    +  private[ui] def getSchedulerDelay(
    +      info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
    +    if (info.finished) {
    +      val totalExecutionTime = info.finishTime - info.launchTime
    +      val executorOverhead = (metrics.executorDeserializeTime +
    +        metrics.resultSerializationTime)
    +      math.max(
    +        0,
    +        totalExecutionTime - metrics.executorRunTime - executorOverhead -
    +          getGettingResultTime(info, currentTime))
    +    } else {
    +      // The task is still running and the metrics like executorRunTime 
are not available.
    +      0L
    +    }
    +  }
    +}
    +
    +private[ui] case class TaskTableRowInputData(inputSortable: String, 
inputReadable: String)
    +
    +private[ui] case class TaskTableRowOutputData(outputSortable: String, 
outputReadable: String)
    +
    +private[ui] case class TaskTableRowShuffleReadData(
    +    shuffleReadBlockedTimeSortable: String,
    +    shuffleReadBlockedTimeReadable: String,
    +    shuffleReadSortable: String,
    +    shuffleReadReadable: String,
    +    shuffleReadRemoteSortable: String,
    +    shuffleReadRemoteReadable: String)
    +
    +private[ui] case class TaskTableRowShuffleWriteData(
    +    writeTimeSortable: String,
    +    writeTimeReadable: String,
    +    shuffleWriteSortable: String,
    +    shuffleWriteReadable: String)
    +
    +private[ui] case class TaskTableRowBytesSpilledData(
    +    memoryBytesSpilledSortable: String,
    +    memoryBytesSpilledReadable: String,
    +    diskBytesSpilledSortable: String,
    +    diskBytesSpilledReadable: String)
    +
    +/**
    + * Contains all data that needs for sorting and generating HTML. Using 
this one rather than
    + * TaskUIData to avoid creating duplicate contents during sorting the data.
    + */
    +private[ui] case class TaskTableRowData(
    +    index: Int,
    +    taskId: Long,
    +    attempt: Int,
    +    speculative: Boolean,
    +    status: String,
    +    taskLocality: String,
    +    executorIdAndHost: String,
    +    launchTime: Long,
    +    duration: Long,
    +    formatDuration: String,
    +    schedulerDelay: Long,
    +    taskDeserializationTime: Long,
    +    gcTime: Long,
    +    serializationTime: Long,
    +    gettingResultTime: Long,
    +    accumulators: Option[String], // HTML
    +    input: Option[TaskTableRowInputData],
    +    output: Option[TaskTableRowOutputData],
    +    shuffleRead: Option[TaskTableRowShuffleReadData],
    +    shuffleWrite: Option[TaskTableRowShuffleWriteData],
    +    bytesSpilled: Option[TaskTableRowBytesSpilledData],
    +    error: String)
    +
    +private[ui] class TaskDataSource(
    +    tasks: Seq[TaskUIData],
    +    hasAccumulators: Boolean,
    +    hasInput: Boolean,
    +    hasOutput: Boolean,
    +    hasShuffleRead: Boolean,
    +    hasShuffleWrite: Boolean,
    +    hasBytesSpilled: Boolean,
    +    currentTime: Long,
    +    page: Int,
    +    pageSize: Int,
    +    sortColumn: String,
    +    desc: Boolean) extends PagedDataSource[TaskTableRowData](page: Int, 
pageSize: Int) {
    +  import StagePage._
    +
    +  // Convert TaskUIData to TaskTableRowData which contains the final 
contents to show in the table
    +  // so that we can avoid creating duplicate contents during sorting the 
data
    +  override val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
    +
    +  private def taskRow(taskData: TaskUIData): TaskTableRowData = {
    +    val TaskUIData(info, metrics, errorMessage) = taskData
    +    val duration = if (info.status == "RUNNING") 
info.timeRunning(currentTime)
    +    else metrics.map(_.executorRunTime).getOrElse(1L)
    +    val formatDuration = if (info.status == "RUNNING") 
UIUtils.formatDuration(duration)
    +    else metrics.map(m => 
UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
    --- End diff --
    
    Indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to