Github user pgandhi999 commented on a diff in the pull request: https://github.com/apache/spark/pull/21688#discussion_r235184363 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -315,187 +241,22 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We } </script> - val metricsSummary = store.taskSummary(stageData.stageId, stageData.attemptId, - Array(0, 0.25, 0.5, 0.75, 1.0)) - - val summaryTable = metricsSummary.map { metrics => - def timeQuantiles(data: IndexedSeq[Double]): Seq[Node] = { - data.map { millis => - <td>{UIUtils.formatDuration(millis.toLong)}</td> - } - } - - def sizeQuantiles(data: IndexedSeq[Double]): Seq[Node] = { - data.map { size => - <td>{Utils.bytesToString(size.toLong)}</td> - } - } - - def sizeQuantilesWithRecords( - data: IndexedSeq[Double], - records: IndexedSeq[Double]) : Seq[Node] = { - data.zip(records).map { case (d, r) => - <td>{s"${Utils.bytesToString(d.toLong)} / ${r.toLong}"}</td> - } - } - - def titleCell(title: String, tooltip: String): Seq[Node] = { - <td> - <span data-toggle="tooltip" title={tooltip} data-placement="right"> - {title} - </span> - </td> - } - - def simpleTitleCell(title: String): Seq[Node] = <td>{title}</td> - - val deserializationQuantiles = titleCell("Task Deserialization Time", - ToolTips.TASK_DESERIALIZATION_TIME) ++ timeQuantiles(metrics.executorDeserializeTime) - - val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.executorRunTime) - - val gcQuantiles = titleCell("GC Time", ToolTips.GC_TIME) ++ timeQuantiles(metrics.jvmGcTime) - - val serializationQuantiles = titleCell("Result Serialization Time", - ToolTips.RESULT_SERIALIZATION_TIME) ++ timeQuantiles(metrics.resultSerializationTime) - - val gettingResultQuantiles = titleCell("Getting Result Time", ToolTips.GETTING_RESULT_TIME) ++ - timeQuantiles(metrics.gettingResultTime) - - val peakExecutionMemoryQuantiles = titleCell("Peak Execution Memory", - ToolTips.PEAK_EXECUTION_MEMORY) ++ sizeQuantiles(metrics.peakExecutionMemory) - - // The scheduler delay includes the network delay to send the task to the worker - // machine and to send back the result (but not the time to fetch the task result, - // if it needed to be fetched from the block manager on the worker). - val schedulerDelayQuantiles = titleCell("Scheduler Delay", ToolTips.SCHEDULER_DELAY) ++ - timeQuantiles(metrics.schedulerDelay) - - def inputQuantiles: Seq[Node] = { - simpleTitleCell("Input Size / Records") ++ - sizeQuantilesWithRecords(metrics.inputMetrics.bytesRead, metrics.inputMetrics.recordsRead) - } - - def outputQuantiles: Seq[Node] = { - simpleTitleCell("Output Size / Records") ++ - sizeQuantilesWithRecords(metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten) - } - - def shuffleReadBlockedQuantiles: Seq[Node] = { - titleCell("Shuffle Read Blocked Time", ToolTips.SHUFFLE_READ_BLOCKED_TIME) ++ - timeQuantiles(metrics.shuffleReadMetrics.fetchWaitTime) - } - - def shuffleReadTotalQuantiles: Seq[Node] = { - titleCell("Shuffle Read Size / Records", ToolTips.SHUFFLE_READ) ++ - sizeQuantilesWithRecords(metrics.shuffleReadMetrics.readBytes, - metrics.shuffleReadMetrics.readRecords) - } - - def shuffleReadRemoteQuantiles: Seq[Node] = { - titleCell("Shuffle Remote Reads", ToolTips.SHUFFLE_READ_REMOTE_SIZE) ++ - sizeQuantiles(metrics.shuffleReadMetrics.remoteBytesRead) - } - - def shuffleWriteQuantiles: Seq[Node] = { - simpleTitleCell("Shuffle Write Size / Records") ++ - sizeQuantilesWithRecords(metrics.shuffleWriteMetrics.writeBytes, - metrics.shuffleWriteMetrics.writeRecords) - } - - def memoryBytesSpilledQuantiles: Seq[Node] = { - simpleTitleCell("Shuffle spill (memory)") ++ sizeQuantiles(metrics.memoryBytesSpilled) - } - - def diskBytesSpilledQuantiles: Seq[Node] = { - simpleTitleCell("Shuffle spill (disk)") ++ sizeQuantiles(metrics.diskBytesSpilled) - } - - val listings: Seq[Seq[Node]] = Seq( - <tr>{serviceQuantiles}</tr>, - <tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>, - <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> - {deserializationQuantiles} - </tr> - <tr>{gcQuantiles}</tr>, - <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> - {serializationQuantiles} - </tr>, - <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, - <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}> - {peakExecutionMemoryQuantiles} - </tr>, - if (hasInput(stageData)) <tr>{inputQuantiles}</tr> else Nil, - if (hasOutput(stageData)) <tr>{outputQuantiles}</tr> else Nil, - if (hasShuffleRead(stageData)) { - <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> - {shuffleReadBlockedQuantiles} - </tr> - <tr>{shuffleReadTotalQuantiles}</tr> - <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> - {shuffleReadRemoteQuantiles} - </tr> - } else { - Nil - }, - if (hasShuffleWrite(stageData)) <tr>{shuffleWriteQuantiles}</tr> else Nil, - if (hasBytesSpilled(stageData)) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, - if (hasBytesSpilled(stageData)) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) - - val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", - "Max") - // The summary table does not use CSS to stripe rows, which doesn't work with hidden - // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows). - UIUtils.listingTable( - quantileHeaders, - identity[Seq[Node]], - listings, - fixedWidth = true, - id = Some("task-summary-table"), - stripeRowsWithCss = false) - } - - val executorTable = new ExecutorTable(stageData, parent.store) - - val maybeAccumulableTable: Seq[Node] = - if (hasAccumulators(stageData)) { <h4>Accumulators</h4> ++ accumulableTable } else Seq() - - val aggMetrics = - <span class="collapse-aggregated-metrics collapse-table" - onClick="collapseTable('collapse-aggregated-metrics','aggregated-metrics')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Aggregated Metrics by Executor</a> - </h4> - </span> - <div class="aggregated-metrics collapsible-table"> - {executorTable.toNodeSeq} - </div> - val content = summary ++ - dagViz ++ - showAdditionalMetrics ++ + dagViz ++ <div id="showAdditionalMetrics"></div> ++ makeTimeline( // Only show the tasks in the table - Option(taskTable).map(_.dataSource.tasks).getOrElse(Nil), - currentTime) ++ - <h4>Summary Metrics for <a href="#tasks-section">{numCompleted} Completed Tasks</a></h4> ++ - <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ - aggMetrics ++ - maybeAccumulableTable ++ - <span id="tasks-section" class="collapse-aggregated-tasks collapse-table" - onClick="collapseTable('collapse-aggregated-tasks','aggregated-tasks')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Tasks ({totalTasksNumStr})</a> - </h4> - </span> ++ - <div class="aggregated-tasks collapsible-table"> - {taskTableHTML ++ jsForScrollingDownToTaskTable} - </div> - UIUtils.headerSparkPage(request, stageHeader, content, parent, showVisualization = true) + Option(taskTable).map({ taskPagedTable => + val from = (taskPage - 1) * taskPageSize + val to = taskPagedTable.dataSource.dataSize.min(taskPage * taskPageSize) --- End diff -- Done
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org