Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r143864647 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -340,261 +259,19 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined) - val summaryTable: Option[Seq[Node]] = - if (validTasks.isEmpty) { - None - } - else { - def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = - Distribution(data).get.getQuantiles() - def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = { - getDistributionQuantiles(times).map { millis => - <td>{UIUtils.formatDuration(millis.toLong)}</td> - } - } - def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = { - getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>) - } - - val deserializationTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.executorDeserializeTime.toDouble - } - val deserializationQuantiles = - <td> - <span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME} - data-placement="right"> - Task Deserialization Time - </span> - </td> +: getFormattedTimeQuantiles(deserializationTimes) - - val serviceTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.executorRunTime.toDouble - } - val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes) - - val gcTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.jvmGCTime.toDouble - } - val gcQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.GC_TIME} data-placement="right">GC Time - </span> - </td> +: getFormattedTimeQuantiles(gcTimes) - - val serializationTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.resultSerializationTime.toDouble - } - val serializationQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right"> - Result Serialization Time - </span> - </td> +: getFormattedTimeQuantiles(serializationTimes) - - val gettingResultTimes = validTasks.map { taskUIData: TaskUIData => - getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble - } - val gettingResultQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.GETTING_RESULT_TIME} data-placement="right"> - Getting Result Time - </span> - </td> +: - getFormattedTimeQuantiles(gettingResultTimes) - - val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.peakExecutionMemory.toDouble - } - val peakExecutionMemoryQuantiles = { - <td> - <span data-toggle="tooltip" - title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right"> - Peak Execution Memory - </span> - </td> +: getFormattedSizeQuantiles(peakExecutionMemory) - } - - // The scheduler delay includes the network delay to send the task to the worker - // machine and to send back the result (but not the time to fetch the task result, - // if it needed to be fetched from the block manager on the worker). - val schedulerDelays = validTasks.map { taskUIData: TaskUIData => - getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble - } - val schedulerDelayTitle = <td><span data-toggle="tooltip" - title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td> - val schedulerDelayQuantiles = schedulerDelayTitle +: - getFormattedTimeQuantiles(schedulerDelays) - def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) - : Seq[Elem] = { - val recordDist = getDistributionQuantiles(records).iterator - getDistributionQuantiles(data).map(d => - <td>{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}</td> - ) - } - - val inputSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.inputMetrics.bytesRead.toDouble - } - - val inputRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.inputMetrics.recordsRead.toDouble - } - - val inputQuantiles = <td>Input Size / Records</td> +: - getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) - - val outputSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble - } - - val outputRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble - } - - val outputQuantiles = <td>Output Size / Records</td> +: - getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) - - val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble - } - val shuffleReadBlockedQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right"> - Shuffle Read Blocked Time - </span> - </td> +: - getFormattedTimeQuantiles(shuffleReadBlockedTimes) - - val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble - } - val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble - } - val shuffleReadTotalQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.SHUFFLE_READ} data-placement="right"> - Shuffle Read Size / Records - </span> - </td> +: - getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords) - - val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble - } - val shuffleReadRemoteQuantiles = - <td> - <span data-toggle="tooltip" - title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right"> - Shuffle Remote Reads - </span> - </td> +: - getFormattedSizeQuantiles(shuffleReadRemoteSizes) - - val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble - } - - val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble - } - - val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +: - getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) - - val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.memoryBytesSpilled.toDouble - } - val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +: - getFormattedSizeQuantiles(memoryBytesSpilledSizes) - - val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.diskBytesSpilled.toDouble - } - val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +: - getFormattedSizeQuantiles(diskBytesSpilledSizes) - - val listings: Seq[Seq[Node]] = Seq( - <tr>{serviceQuantiles}</tr>, - <tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>, - <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> - {deserializationQuantiles} - </tr> - <tr>{gcQuantiles}</tr>, - <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> - {serializationQuantiles} - </tr>, - <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, - <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}> - {peakExecutionMemoryQuantiles} - </tr>, - if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil, - if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil, - if (stageData.hasShuffleRead) { - <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> - {shuffleReadBlockedQuantiles} - </tr> - <tr>{shuffleReadTotalQuantiles}</tr> - <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> - {shuffleReadRemoteQuantiles} - </tr> - } else { - Nil - }, - if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, - if (stageData.hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, - if (stageData.hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) - - val quantileHeaders = Seq("Metric", "Min", "25th percentile", - "Median", "75th percentile", "Max") - // The summary table does not use CSS to stripe rows, which doesn't work with hidden - // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows). - Some(UIUtils.listingTable( - quantileHeaders, - identity[Seq[Node]], - listings, - fixedWidth = true, - id = Some("task-summary-table"), - stripeRowsWithCss = false)) - } - - val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) - - val maybeAccumulableTable: Seq[Node] = - if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty - - val aggMetrics = - <span class="collapse-aggregated-metrics collapse-table" - onClick="collapseTable('collapse-aggregated-metrics','aggregated-metrics')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Aggregated Metrics by Executor</a> - </h4> - </span> - <div class="aggregated-metrics collapsible-table"> - {executorTable.toNodeSeq} - </div> - val content = summary ++ - dagViz ++ - showAdditionalMetrics ++ - makeTimeline( + dagViz ++ <div id="showAdditionalMetrics"></div> ++ + makeTimeline( // Only show the tasks in the table stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)), --- End diff -- (commenting here since it won't let me comment on the actual line) `taskIdsInPage` is only used here, so following up where it's created `taskTableHTML` is no longer needed, maybe clean up that code up there a bit
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org