iRakson commented on a change in pull request #26756:
[SPARK-30119][WebUI]Support Pagination for Batch Tables in Streaming Tab
URL: https://github.com/apache/spark/pull/26756#discussion_r358069458
##########
File path:
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
##########
@@ -17,39 +17,122 @@
package org.apache.spark.streaming.ui
-import scala.xml.Node
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Unparsed}
+
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils =>
SparkUIUtils}
+
+private[ui] class StreamingBatchPagedTable(
+ request: HttpServletRequest,
+ parent: StreamingTab,
+ batchInterval: Long,
+ batchData: Seq[BatchUIData],
+ streamingBatchTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedTable[BatchUIData] {
+
+ override val dataSource = new StreamingBatchTableDataSource(batchData,
pageSize, sortColumn, desc)
+ private val parameterPath =
s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}"
+ private val firstFailureReason = getFirstFailureReason(batchData)
+
+ override def tableId: String = streamingBatchTag
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped " +
+ "table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$streamingBatchTag.sort=$encodedSortColumn" +
+ s"&$streamingBatchTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
-import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+ override def pageSizeFormField: String = s"$streamingBatchTag.pageSize"
-private[ui] abstract class BatchTableBase(tableId: String, batchInterval:
Long) {
+ override def pageNumberFormField: String = s"$streamingBatchTag.page"
- protected def columns: Seq[Node] = {
- <th>Batch Time</th>
- <th>Records</th>
- <th>Scheduling Delay
- {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit
jobs of a batch", "top")}
- </th>
- <th>Processing Time
- {SparkUIUtils.tooltip("Time taken to process all jobs of a batch",
"top")}</th>
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
s"$parameterPath&$streamingBatchTag.sort=$encodedSortColumn&$streamingBatchTag.desc=$desc"
}
- /**
- * Return the first failure reason if finding in the batches.
- */
- protected def getFirstFailureReason(batches: Seq[BatchUIData]):
Option[String] = {
- batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
- }
-
- protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
- val firstFailureReason =
batch.outputOperations.flatMap(_._2.failureReason).headOption
- firstFailureReason.map { failureReason =>
- val failureReasonForUI =
UIUtils.createOutputOperationFailureForUI(failureReason)
- UIUtils.failureReasonCell(
- failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails =
false)
- }.getOrElse(<td>-</td>)
+ override def headers: Seq[Node] = {
+ val completedBatchTableHeaders = Seq("Batch Time", "Records", "Scheduling
Delay",
Review comment:
Both the tables are identical i.e. the schema is same for both. So headers
will remain same for both. But yeah, `completedBatchTableHeaders` is
misleading. So, i will update the variable name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]