Github user nblintao commented on a diff in the pull request:
https://github.com/apache/spark/pull/13620#discussion_r71999178
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab)
extends WebUIPage("") {
}
}
}
+
+private[ui] class JobTableRowData(
+ val jobData: JobUIData,
+ val lastStageName: String,
+ val lastStageDescription: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val jobDescription: NodeSeq,
+ val detailUrl: String)
+
+private[ui] class JobDataSource(
+ jobs: Seq[JobUIData],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ basePath: String,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
+
+ // Convert JobUIData to JobTableRowData which contains the final
contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the
data
+ private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedJobIds: Set[Int] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedJobIds = r.map(_.jobData.jobId).toSet
+ r
+ }
+
+ private def getLastStageNameAndDescription(job: JobUIData): (String,
String) = {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => stageIdToInfo.get(ids.max)}
+ val lastStageData = lastStageInfo.flatMap { s =>
+ stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val description = lastStageData.flatMap(_.description).getOrElse("")
+ (name, description)
+ }
+
+ private def jobRow(jobData: JobUIData): JobTableRowData = {
+ val (lastStageName, lastStageDescription) =
getLastStageNameAndDescription(jobData)
+ val duration: Option[Long] = {
+ jobData.submissionTime.map { start =>
+ val end =
jobData.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val formattedDuration = duration.map(d =>
UIUtils.formatDuration(d)).getOrElse("Unknown")
+ val submissionTime = jobData.submissionTime
+ val formattedSubmissionTime =
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+ val jobDescription = UIUtils.makeDescription(lastStageDescription,
basePath, plainText = false)
+
+ val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
+
+ new JobTableRowData (
+ jobData,
+ lastStageName,
+ lastStageDescription,
+ duration.getOrElse(-1),
+ formattedDuration,
+ submissionTime.getOrElse(-1),
+ formattedSubmissionTime,
+ jobDescription,
+ detailUrl
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean):
Ordering[JobTableRowData] = {
+ val ordering = sortColumn match {
+ case "Job Id" | "Job Id (Job Group)" => new
Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
+ }
+ case "Description" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.String.compare(x.lastStageDescription,
y.lastStageDescription)
+ }
+ case "Submitted" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Stages: Succeeded/Total" | "Tasks (for all stages):
Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column:
$sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown
column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+private[ui] class JobPagedTable(
+ data: Seq[JobUIData],
+ jobTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ currentTime: Long,
+ jobIdTitle: String,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean
+ ) extends PagedTable[JobTableRowData] {
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override def tableId: String = jobTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped
table-head-clickable"
+
+ override def pageSizeFormField: String = jobTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = jobTag + ".page"
+
+ override val dataSource = new JobDataSource(
+ data,
+ stageIdToInfo,
+ stageIdToData,
+ basePath,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc)
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$jobTag.sort=$encodedSortColumn" +
+ s"&$jobTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
+ }
+
+ override def headers: Seq[Node] = {
+ // Information for each header: title, cssClass, and sortable
+ val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
+ Seq(
+ (jobIdTitle, "", true),
+ ("Description", "", true), ("Submitted", "", true), ("Duration",
"", true),
+ ("Stages: Succeeded/Total", "", false),
+ ("Tasks (for all stages): Succeeded/Total", "", false)
+ )
+
+ if
(!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+
+ val headerRow: Seq[Node] = {
+ jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.desc=${!desc}" +
+ s"&$jobTag.pageSize=$pageSize")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}<span>
+ {Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.pageSize=$pageSize")
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}
+ </a>
+ </th>
+ } else {
+ <th class={cssClass}>
+ {header}
+ </th>
+ }
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ override def row(jobTableRow: JobTableRowData): Seq[Node] = {
+ val job = jobTableRow.jobData
+
+ <tr id={"job-" + job.jobId}>
--- End diff --
Yes.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]