Github user carsonwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/7399#discussion_r34866161
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -672,164 +658,605 @@ private[ui] class StagePage(parent: StagesTab)
extends WebUIPage("stage") {
</script>
}
- def taskRow(
- hasAccumulators: Boolean,
- hasInput: Boolean,
- hasOutput: Boolean,
- hasShuffleRead: Boolean,
- hasShuffleWrite: Boolean,
- hasBytesSpilled: Boolean,
- currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
- taskData match { case TaskUIData(info, metrics, errorMessage) =>
- val duration = if (info.status == "RUNNING")
info.timeRunning(currentTime)
- else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING")
UIUtils.formatDuration(duration)
- else metrics.map(m =>
UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
- val schedulerDelay = metrics.map(getSchedulerDelay(info, _,
currentTime)).getOrElse(0L)
- val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
- val taskDeserializationTime =
metrics.map(_.executorDeserializeTime).getOrElse(0L)
- val serializationTime =
metrics.map(_.resultSerializationTime).getOrElse(0L)
- val gettingResultTime = getGettingResultTime(info, currentTime)
-
- val maybeAccumulators = info.accumulables
- val accumulatorsReadable = maybeAccumulators.map { acc =>
- StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+}
+
+private[ui] object StagePage {
+ private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long):
Long = {
+ if (info.gettingResult) {
+ if (info.finished) {
+ info.finishTime - info.gettingResultTime
+ } else {
+ // The task is still fetching the result.
+ currentTime - info.gettingResultTime
}
+ } else {
+ 0L
+ }
+ }
- val maybeInput = metrics.flatMap(_.inputMetrics)
- val inputSortable =
maybeInput.map(_.bytesRead.toString).getOrElse("")
- val inputReadable = maybeInput
- .map(m => s"${Utils.bytesToString(m.bytesRead)}
(${m.readMethod.toString.toLowerCase()})")
- .getOrElse("")
- val inputRecords =
maybeInput.map(_.recordsRead.toString).getOrElse("")
-
- val maybeOutput = metrics.flatMap(_.outputMetrics)
- val outputSortable =
maybeOutput.map(_.bytesWritten.toString).getOrElse("")
- val outputReadable = maybeOutput
- .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
- .getOrElse("")
- val outputRecords =
maybeOutput.map(_.recordsWritten.toString).getOrElse("")
-
- val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
- val shuffleReadBlockedTimeSortable = maybeShuffleRead
- .map(_.fetchWaitTime.toString).getOrElse("")
- val shuffleReadBlockedTimeReadable =
- maybeShuffleRead.map(ms =>
UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
-
- val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
- val shuffleReadSortable =
totalShuffleBytes.map(_.toString).getOrElse("")
- val shuffleReadReadable =
totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
- val shuffleReadRecords =
maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
-
- val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
- val shuffleReadRemoteSortable =
remoteShuffleBytes.map(_.toString).getOrElse("")
- val shuffleReadRemoteReadable =
remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
-
- val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
- val shuffleWriteSortable =
maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
- val shuffleWriteReadable = maybeShuffleWrite
- .map(m =>
s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
- val shuffleWriteRecords = maybeShuffleWrite
- .map(_.shuffleRecordsWritten.toString).getOrElse("")
-
- val maybeWriteTime =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
- val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map(t => t / (1000 *
1000)).map { ms =>
- if (ms == 0) "" else UIUtils.formatDuration(ms)
- }.getOrElse("")
-
- val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
- val memoryBytesSpilledSortable =
maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
- val memoryBytesSpilledReadable =
- maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
- val diskBytesSpilledSortable =
maybeDiskBytesSpilled.map(_.toString).getOrElse("")
- val diskBytesSpilledReadable =
maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- <tr>
- <td>{info.index}</td>
- <td>{info.taskId}</td>
- <td sorttable_customkey={info.attempt.toString}>{
- if (info.speculative) s"${info.attempt} (speculative)" else
info.attempt.toString
- }</td>
- <td>{info.status}</td>
- <td>{info.taskLocality}</td>
- <td>{info.executorId} / {info.host}</td>
- <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
- <td sorttable_customkey={duration.toString}>
- {formatDuration}
- </td>
- <td sorttable_customkey={schedulerDelay.toString}
- class={TaskDetailsClassNames.SCHEDULER_DELAY}>
- {UIUtils.formatDuration(schedulerDelay.toLong)}
- </td>
- <td sorttable_customkey={taskDeserializationTime.toString}
- class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
- {UIUtils.formatDuration(taskDeserializationTime.toLong)}
- </td>
- <td sorttable_customkey={gcTime.toString}>
- {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
- </td>
- <td sorttable_customkey={serializationTime.toString}
- class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
- {UIUtils.formatDuration(serializationTime)}
- </td>
- <td sorttable_customkey={gettingResultTime.toString}
- class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
- {UIUtils.formatDuration(gettingResultTime)}
- </td>
- {if (hasAccumulators) {
- <td>
- {Unparsed(accumulatorsReadable.mkString("<br/>"))}
- </td>
- }}
- {if (hasInput) {
- <td sorttable_customkey={inputSortable}>
- {s"$inputReadable / $inputRecords"}
- </td>
- }}
- {if (hasOutput) {
- <td sorttable_customkey={outputSortable}>
- {s"$outputReadable / $outputRecords"}
- </td>
- }}
+ private[ui] def getSchedulerDelay(
+ info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+ if (info.finished) {
+ val totalExecutionTime = info.finishTime - info.launchTime
+ val executorOverhead = (metrics.executorDeserializeTime +
+ metrics.resultSerializationTime)
+ math.max(
+ 0,
+ totalExecutionTime - metrics.executorRunTime - executorOverhead -
+ getGettingResultTime(info, currentTime))
+ } else {
+ // The task is still running and the metrics like executorRunTime
are not available.
+ 0L
+ }
+ }
+}
+
+private[ui] case class TaskTableRowInputData(inputSortable: String,
inputReadable: String)
+
+private[ui] case class TaskTableRowOutputData(outputSortable: String,
outputReadable: String)
+
+private[ui] case class TaskTableRowShuffleReadData(
+ shuffleReadBlockedTimeSortable: String,
+ shuffleReadBlockedTimeReadable: String,
+ shuffleReadSortable: String,
+ shuffleReadReadable: String,
+ shuffleReadRemoteSortable: String,
+ shuffleReadRemoteReadable: String)
+
+private[ui] case class TaskTableRowShuffleWriteData(
+ writeTimeSortable: String,
+ writeTimeReadable: String,
+ shuffleWriteSortable: String,
+ shuffleWriteReadable: String)
+
+private[ui] case class TaskTableRowBytesSpilledData(
+ memoryBytesSpilledSortable: String,
+ memoryBytesSpilledReadable: String,
+ diskBytesSpilledSortable: String,
+ diskBytesSpilledReadable: String)
+
+/**
+ * Contains all data that needs for sorting and generating HTML. Using
this one rather than
+ * TaskUIData to avoid creating duplicate contents during sorting the data.
+ */
+private[ui] case class TaskTableRowData(
+ index: Int,
+ taskId: Long,
+ attempt: Int,
+ speculative: Boolean,
+ status: String,
+ taskLocality: String,
+ executorIdAndHost: String,
+ launchTime: Long,
+ duration: Long,
+ formatDuration: String,
+ schedulerDelay: Long,
+ taskDeserializationTime: Long,
+ gcTime: Long,
+ serializationTime: Long,
+ gettingResultTime: Long,
+ accumulators: Option[String], // HTML
+ input: Option[TaskTableRowInputData],
+ output: Option[TaskTableRowOutputData],
+ shuffleRead: Option[TaskTableRowShuffleReadData],
+ shuffleWrite: Option[TaskTableRowShuffleWriteData],
+ bytesSpilled: Option[TaskTableRowBytesSpilledData],
+ error: String)
+
+private[ui] class TaskDataSource(
+ tasks: Seq[TaskUIData],
+ hasAccumulators: Boolean,
+ hasInput: Boolean,
+ hasOutput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean,
+ currentTime: Long,
+ page: Int,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[TaskTableRowData](page: Int,
pageSize: Int) {
+ import StagePage._
+
+ // Convert TaskUIData to TaskTableRowData which contains the final
contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the
data
+ override val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
+
+ private def taskRow(taskData: TaskUIData): TaskTableRowData = {
+ val TaskUIData(info, metrics, errorMessage) = taskData
+ val duration = if (info.status == "RUNNING")
info.timeRunning(currentTime)
+ else metrics.map(_.executorRunTime).getOrElse(1L)
+ val formatDuration = if (info.status == "RUNNING")
UIUtils.formatDuration(duration)
+ else metrics.map(m =>
UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
+ val schedulerDelay = metrics.map(getSchedulerDelay(info, _,
currentTime)).getOrElse(0L)
+ val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
+ val taskDeserializationTime =
metrics.map(_.executorDeserializeTime).getOrElse(0L)
+ val serializationTime =
metrics.map(_.resultSerializationTime).getOrElse(0L)
+ val gettingResultTime = getGettingResultTime(info, currentTime)
+
+ val maybeAccumulators = info.accumulables
+ val accumulatorsReadable = maybeAccumulators.map { acc =>
+ StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+ }
+
+ val maybeInput = metrics.flatMap(_.inputMetrics)
+ val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+ val inputReadable = maybeInput
+ .map(m => s"${Utils.bytesToString(m.bytesRead)}
(${m.readMethod.toString.toLowerCase()})")
+ .getOrElse("")
+ val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
+
+ val maybeOutput = metrics.flatMap(_.outputMetrics)
+ val outputSortable =
maybeOutput.map(_.bytesWritten.toString).getOrElse("")
+ val outputReadable = maybeOutput
+ .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
+ .getOrElse("")
+ val outputRecords =
maybeOutput.map(_.recordsWritten.toString).getOrElse("")
+
+ val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+ val shuffleReadBlockedTimeSortable = maybeShuffleRead
+ .map(_.fetchWaitTime.toString).getOrElse("")
+ val shuffleReadBlockedTimeReadable =
+ maybeShuffleRead.map(ms =>
UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
+
+ val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+ val shuffleReadSortable =
totalShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadReadable =
totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
+ val shuffleReadRecords =
maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
+
+ val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+ val shuffleReadRemoteSortable =
remoteShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadRemoteReadable =
remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
+ val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+ val shuffleWriteSortable =
maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
+ val shuffleWriteReadable = maybeShuffleWrite
+ .map(m =>
s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+ val shuffleWriteRecords = maybeShuffleWrite
+ .map(_.shuffleRecordsWritten.toString).getOrElse("")
+
+ val maybeWriteTime =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
+ val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map
{ ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
+ }.getOrElse("")
+
+ val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
+ val memoryBytesSpilledSortable =
maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
+ val memoryBytesSpilledReadable =
+ maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+ val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
+ val diskBytesSpilledSortable =
maybeDiskBytesSpilled.map(_.toString).getOrElse("")
+ val diskBytesSpilledReadable =
maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+ val input =
+ if (hasInput) {
+ Some(TaskTableRowInputData(inputSortable, s"$inputReadable /
$inputRecords"))
+ } else {
+ None
+ }
+
+ val output =
+ if (hasOutput) {
+ Some(TaskTableRowOutputData(outputSortable, s"$outputReadable /
$outputRecords"))
+ } else {
+ None
+ }
+
+ val shuffleRead =
+ if (hasShuffleRead) {
+ Some(TaskTableRowShuffleReadData(
+ shuffleReadBlockedTimeSortable,
+ shuffleReadBlockedTimeReadable,
+ shuffleReadSortable,
+ s"$shuffleReadReadable / $shuffleReadRecords",
+ shuffleReadRemoteSortable,
+ shuffleReadRemoteReadable
+ ))
+ } else {
+ None
+ }
+
+ val shuffleWrite =
+ if (hasShuffleWrite) {
+ Some(TaskTableRowShuffleWriteData(
+ writeTimeSortable,
+ writeTimeReadable,
+ shuffleWriteSortable,
+ s"$shuffleWriteReadable / $shuffleWriteRecords"
+ ))
+ } else {
+ None
+ }
+
+ val bytesSpilled =
+ if (hasBytesSpilled) {
+ Some(TaskTableRowBytesSpilledData(
+ memoryBytesSpilledSortable,
+ memoryBytesSpilledReadable,
+ diskBytesSpilledSortable,
+ diskBytesSpilledReadable
+ ))
+ } else {
+ None
+ }
+
+ TaskTableRowData(
+ info.index,
+ info.taskId,
+ info.attempt,
+ info.speculative,
+ info.status,
+ info.taskLocality.toString,
+ s"${info.executorId} / ${info.host}",
+ info.launchTime,
+ duration,
+ formatDuration,
+ schedulerDelay,
+ taskDeserializationTime,
+ gcTime,
+ serializationTime,
+ gettingResultTime,
+ if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>"))
else None,
+ input,
+ output,
+ shuffleRead,
+ shuffleWrite,
+ bytesSpilled,
+ errorMessage.getOrElse("")
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean):
Ordering[TaskTableRowData] = {
+ val ordering = sortColumn match {
+ case "Index" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Int.compare(x.index, y.index)
+ }
+ case "ID" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.taskId, y.taskId)
+ }
+ case "Attempt" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Int.compare(x.attempt, y.attempt)
+ }
+ case "Status" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.String.compare(x.status, y.status)
+ }
+ case "Locality Level" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.String.compare(x.taskLocality, y.taskLocality)
+ }
+ case "Executor ID / Host" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
+ }
+ case "Launch Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.launchTime, y.launchTime)
+ }
+ case "Duration" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Scheduler Delay" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
+ }
+ case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.taskDeserializationTime,
y.taskDeserializationTime)
+ }
+ case "GC Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.gcTime, y.gcTime)
+ }
+ case "Result Serialization Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.serializationTime, y.serializationTime)
+ }
+ case "Getting Result Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
+ }
+ case "Accumulators" =>
+ if (hasAccumulators) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+ Ordering.String.compare(x.accumulators.get,
y.accumulators.get)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Accumulators because of no accumulators")
+ }
+ case "Input Size / Records" =>
+ if (hasInput) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+ Ordering.String.compare(x.input.get.inputSortable,
y.input.get.inputSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Input Size / Records because of no inputs")
+ }
+ case "Output Size / Records" =>
+ if (hasOutput) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+ Ordering.String.compare(x.output.get.outputSortable,
y.output.get.outputSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Input Size / Records because of no outputs")
+ }
+ // ShuffleRead
+ case "Shuffle Read Blocked Time" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable,
+ y.shuffleRead.get.shuffleReadBlockedTimeSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Input Size / Records because of no shuffle
reads")
+ }
+ case "Shuffle Read Size / Records" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.shuffleRead.get.shuffleReadSortable,
+ y.shuffleRead.get.shuffleReadSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Read Size / Records because of no
shuffle reads")
+ }
+ case "Shuffle Remote Reads" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.shuffleRead.get.shuffleReadRemoteSortable,
+ y.shuffleRead.get.shuffleReadRemoteSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Remote Reads because of no shuffle
reads")
+ }
+ // ShuffleWrite
+ case "Write Time" =>
+ if (hasShuffleWrite) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+ Ordering.String.compare(x.shuffleWrite.get.writeTimeReadable,
+ y.shuffleWrite.get.writeTimeReadable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Write Time because of no shuffle writes")
+ }
+ case "Shuffle Write Size / Records" =>
+ if (hasShuffleWrite) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.shuffleWrite.get.shuffleWriteSortable,
+ y.shuffleWrite.get.shuffleWriteSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Write Size / Records because of no
shuffle writes")
+ }
+ // BytesSpilled
+ case "Shuffle Spill (Memory)" =>
+ if (hasBytesSpilled) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.bytesSpilled.get.memoryBytesSpilledSortable,
+ y.bytesSpilled.get.memoryBytesSpilledSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Spill (Memory) because of no spills")
+ }
+ case "Shuffle Spill (Disk)" =>
+ if (hasBytesSpilled) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y:
TaskTableRowData): Int =
+
Ordering.String.compare(x.bytesSpilled.get.diskBytesSpilledSortable,
+ y.bytesSpilled.get.diskBytesSpilledSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Spill (Disk) because of no spills")
+ }
+ case "Errors" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData):
Int =
+ Ordering.String.compare(x.error, y.error)
+ }
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown
column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+
+private[ui] class TaskPagedTable(
+ basePath: String,
+ data: Seq[TaskUIData],
+ hasAccumulators: Boolean,
+ hasInput: Boolean,
+ hasOutput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean,
+ currentTime: Long,
+ page: Int,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedTable[TaskTableRowData]{
+
+ override def tableId: String = ""
+
+ override def tableCssClass: String = "table table-bordered
table-condensed table-striped"
+
+ override val dataSource: TaskDataSource = new TaskDataSource(
+ data,
+ hasAccumulators,
+ hasInput,
+ hasOutput,
+ hasShuffleRead,
+ hasShuffleWrite,
+ hasBytesSpilled,
+ currentTime,
+ page,
+ pageSize,
+ sortColumn,
+ desc
+ )
+
+ override def pageLink(page: Int): String = {
+
s"${basePath}&task.page=$page&task.sort=${sortColumn}&task.desc=${desc}"
--- End diff --
Do we need encode the sortColumn here as well?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]