Github user pgandhi999 commented on a diff in the pull request:
https://github.com/apache/spark/pull/19270#discussion_r144120603
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -340,261 +259,19 @@ private[ui] class StagePage(parent: StagesTab)
extends WebUIPage("stage") {
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" &&
t.metrics.isDefined)
- val summaryTable: Option[Seq[Node]] =
- if (validTasks.isEmpty) {
- None
- }
- else {
- def getDistributionQuantiles(data: Seq[Double]):
IndexedSeq[Double] =
- Distribution(data).get.getQuantiles()
- def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
- getDistributionQuantiles(times).map { millis =>
- <td>{UIUtils.formatDuration(millis.toLong)}</td>
- }
- }
- def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = {
- getDistributionQuantiles(data).map(d =>
<td>{Utils.bytesToString(d.toLong)}</td>)
- }
-
- val deserializationTimes = validTasks.map { taskUIData:
TaskUIData =>
- taskUIData.metrics.get.executorDeserializeTime.toDouble
- }
- val deserializationQuantiles =
- <td>
- <span data-toggle="tooltip"
title={ToolTips.TASK_DESERIALIZATION_TIME}
- data-placement="right">
- Task Deserialization Time
- </span>
- </td> +: getFormattedTimeQuantiles(deserializationTimes)
-
- val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.executorRunTime.toDouble
- }
- val serviceQuantiles = <td>Duration</td> +:
getFormattedTimeQuantiles(serviceTimes)
-
- val gcTimes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.jvmGCTime.toDouble
- }
- val gcQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.GC_TIME} data-placement="right">GC Time
- </span>
- </td> +: getFormattedTimeQuantiles(gcTimes)
-
- val serializationTimes = validTasks.map { taskUIData: TaskUIData
=>
- taskUIData.metrics.get.resultSerializationTime.toDouble
- }
- val serializationQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.RESULT_SERIALIZATION_TIME}
data-placement="right">
- Result Serialization Time
- </span>
- </td> +: getFormattedTimeQuantiles(serializationTimes)
-
- val gettingResultTimes = validTasks.map { taskUIData: TaskUIData
=>
- getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
- }
- val gettingResultQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.GETTING_RESULT_TIME}
data-placement="right">
- Getting Result Time
- </span>
- </td> +:
- getFormattedTimeQuantiles(gettingResultTimes)
-
- val peakExecutionMemory = validTasks.map { taskUIData:
TaskUIData =>
- taskUIData.metrics.get.peakExecutionMemory.toDouble
- }
- val peakExecutionMemoryQuantiles = {
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.PEAK_EXECUTION_MEMORY}
data-placement="right">
- Peak Execution Memory
- </span>
- </td> +: getFormattedSizeQuantiles(peakExecutionMemory)
- }
-
- // The scheduler delay includes the network delay to send the
task to the worker
- // machine and to send back the result (but not the time to
fetch the task result,
- // if it needed to be fetched from the block manager on the
worker).
- val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
- getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get,
currentTime).toDouble
- }
- val schedulerDelayTitle = <td><span data-toggle="tooltip"
- title={ToolTips.SCHEDULER_DELAY}
data-placement="right">Scheduler Delay</span></td>
- val schedulerDelayQuantiles = schedulerDelayTitle +:
- getFormattedTimeQuantiles(schedulerDelays)
- def getFormattedSizeQuantilesWithRecords(data: Seq[Double],
records: Seq[Double])
- : Seq[Elem] = {
- val recordDist = getDistributionQuantiles(records).iterator
- getDistributionQuantiles(data).map(d =>
- <td>{s"${Utils.bytesToString(d.toLong)} /
${recordDist.next().toLong}"}</td>
- )
- }
-
- val inputSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.inputMetrics.bytesRead.toDouble
- }
-
- val inputRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.inputMetrics.recordsRead.toDouble
- }
-
- val inputQuantiles = <td>Input Size / Records</td> +:
- getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
-
- val outputSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble
- }
-
- val outputRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble
- }
-
- val outputQuantiles = <td>Output Size / Records</td> +:
- getFormattedSizeQuantilesWithRecords(outputSizes,
outputRecords)
-
- val shuffleReadBlockedTimes = validTasks.map { taskUIData:
TaskUIData =>
-
taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble
- }
- val shuffleReadBlockedQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.SHUFFLE_READ_BLOCKED_TIME}
data-placement="right">
- Shuffle Read Blocked Time
- </span>
- </td> +:
- getFormattedTimeQuantiles(shuffleReadBlockedTimes)
-
- val shuffleReadTotalSizes = validTasks.map { taskUIData:
TaskUIData =>
-
taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble
- }
- val shuffleReadTotalRecords = validTasks.map { taskUIData:
TaskUIData =>
- taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble
- }
- val shuffleReadTotalQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.SHUFFLE_READ} data-placement="right">
- Shuffle Read Size / Records
- </span>
- </td> +:
- getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes,
shuffleReadTotalRecords)
-
- val shuffleReadRemoteSizes = validTasks.map { taskUIData:
TaskUIData =>
-
taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble
- }
- val shuffleReadRemoteQuantiles =
- <td>
- <span data-toggle="tooltip"
- title={ToolTips.SHUFFLE_READ_REMOTE_SIZE}
data-placement="right">
- Shuffle Remote Reads
- </span>
- </td> +:
- getFormattedSizeQuantiles(shuffleReadRemoteSizes)
-
- val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData
=>
-
taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble
- }
-
- val shuffleWriteRecords = validTasks.map { taskUIData:
TaskUIData =>
-
taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble
- }
-
- val shuffleWriteQuantiles = <td>Shuffle Write Size /
Records</td> +:
- getFormattedSizeQuantilesWithRecords(shuffleWriteSizes,
shuffleWriteRecords)
-
- val memoryBytesSpilledSizes = validTasks.map { taskUIData:
TaskUIData =>
- taskUIData.metrics.get.memoryBytesSpilled.toDouble
- }
- val memoryBytesSpilledQuantiles = <td>Shuffle spill
(memory)</td> +:
- getFormattedSizeQuantiles(memoryBytesSpilledSizes)
-
- val diskBytesSpilledSizes = validTasks.map { taskUIData:
TaskUIData =>
- taskUIData.metrics.get.diskBytesSpilled.toDouble
- }
- val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
- getFormattedSizeQuantiles(diskBytesSpilledSizes)
-
- val listings: Seq[Seq[Node]] = Seq(
- <tr>{serviceQuantiles}</tr>,
- <tr
class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
- <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
- {deserializationQuantiles}
- </tr>
- <tr>{gcQuantiles}</tr>,
- <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
- {serializationQuantiles}
- </tr>,
- <tr
class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
- <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
- {peakExecutionMemoryQuantiles}
- </tr>,
- if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
- if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
- if (stageData.hasShuffleRead) {
- <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
- {shuffleReadBlockedQuantiles}
- </tr>
- <tr>{shuffleReadTotalQuantiles}</tr>
- <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
- {shuffleReadRemoteQuantiles}
- </tr>
- } else {
- Nil
- },
- if (stageData.hasShuffleWrite)
<tr>{shuffleWriteQuantiles}</tr> else Nil,
- if (stageData.hasBytesSpilled)
<tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
- if (stageData.hasBytesSpilled)
<tr>{diskBytesSpilledQuantiles}</tr> else Nil)
-
- val quantileHeaders = Seq("Metric", "Min", "25th percentile",
- "Median", "75th percentile", "Max")
- // The summary table does not use CSS to stripe rows, which
doesn't work with hidden
- // rows (instead, JavaScript in table.js is used to stripe the
non-hidden rows).
- Some(UIUtils.listingTable(
- quantileHeaders,
- identity[Seq[Node]],
- listings,
- fixedWidth = true,
- id = Some("task-summary-table"),
- stripeRowsWithCss = false))
- }
-
- val executorTable = new ExecutorTable(stageId, stageAttemptId,
parent)
-
- val maybeAccumulableTable: Seq[Node] =
- if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable }
else Seq.empty
-
- val aggMetrics =
- <span class="collapse-aggregated-metrics collapse-table"
-
onClick="collapseTable('collapse-aggregated-metrics','aggregated-metrics')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Aggregated Metrics by Executor</a>
- </h4>
- </span>
- <div class="aggregated-metrics collapsible-table">
- {executorTable.toNodeSeq}
- </div>
-
val content =
summary ++
- dagViz ++
- showAdditionalMetrics ++
- makeTimeline(
+ dagViz ++ <div id="showAdditionalMetrics"></div> ++
+ makeTimeline(
// Only show the tasks in the table
stageData.taskData.values.toSeq.filter(t =>
taskIdsInPage.contains(t.taskInfo.taskId)),
--- End diff --
Fixed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]