Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/21688#discussion_r232738700
--- Diff:
core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala ---
@@ -102,4 +103,120 @@ private[v1] class StagesResource extends
BaseAppResource {
withUI(_.store.taskList(stageId, stageAttemptId, offset, length,
sortBy))
}
+ // This api needs to stay formatted exactly as it is below, since, it is
being used by the
+ // datatables for the stages page.
+ @GET
+ @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskTable")
+ def taskTable(
+ @PathParam("stageId") stageId: Int,
+ @PathParam("stageAttemptId") stageAttemptId: Int,
+ @QueryParam("details") @DefaultValue("true") details: Boolean,
+ @Context uriInfo: UriInfo):
+ HashMap[String, Object] = {
+ withUI { ui =>
+ val uriQueryParameters = uriInfo.getQueryParameters(true)
+ val totalRecords = uriQueryParameters.getFirst("numTasks")
+ var isSearch = false
+ var searchValue: String = null
+ var filteredRecords = totalRecords
+ var _tasksToShow: Seq[TaskData] = null
+ // The datatables client API sends a list of query parameters to the
server which contain
+ // information like the columns to be sorted, search value typed by
the user in the search
+ // box, pagination index etc. For more information on these query
parameters,
+ // refer https://datatables.net/manual/server-side.
+ if (uriQueryParameters.getFirst("search[value]") != null &&
+ uriQueryParameters.getFirst("search[value]").length > 0) {
+ _tasksToShow = doPagination(uriQueryParameters, stageId,
stageAttemptId, true,
+ totalRecords.toInt)
+ isSearch = true
+ searchValue = uriQueryParameters.getFirst("search[value]")
+ } else {
+ _tasksToShow = doPagination(uriQueryParameters, stageId,
stageAttemptId, false,
+ totalRecords.toInt)
+ }
+ val ret = new HashMap[String, Object]()
+ if (_tasksToShow.nonEmpty) {
+ // Performs server-side search based on input from user
+ if (isSearch) {
+ val filteredTaskList = filterTaskList(_tasksToShow, searchValue)
+ filteredRecords = filteredTaskList.length.toString
+ if (filteredTaskList.length > 0) {
+ val pageStartIndex = uriQueryParameters.getFirst("start").toInt
+ val pageLength = uriQueryParameters.getFirst("length").toInt
+ ret.put("aaData", filteredTaskList.slice(pageStartIndex,
pageStartIndex + pageLength))
+ } else {
+ ret.put("aaData", filteredTaskList)
+ }
+ } else {
+ ret.put("aaData", _tasksToShow)
+ }
+ } else {
+ ret.put("aaData", _tasksToShow)
+ }
+ ret.put("recordsTotal", totalRecords)
+ ret.put("recordsFiltered", filteredRecords)
+ ret
+ }
+ }
+
+ // Performs pagination on the server side
+ def doPagination(queryParameters: MultivaluedMap[String, String],
stageId: Int,
+ stageAttemptId: Int, isSearch: Boolean, totalRecords: Int):
Seq[TaskData] = {
+ val queryParams = queryParameters.keySet()
+ var columnNameToSort = queryParameters.getFirst("columnNameToSort")
+ if (columnNameToSort.equalsIgnoreCase("Logs")) {
+ columnNameToSort = "Index"
+ }
+ val isAscendingStr = queryParameters.getFirst("order[0][dir]")
+ var pageStartIndex = 0
+ var pageLength = totalRecords
+ if (!isSearch) {
+ pageStartIndex = queryParameters.getFirst("start").toInt
+ pageLength = queryParameters.getFirst("length").toInt
+ }
+ return withUI(_.store.taskList(stageId, stageAttemptId,
pageStartIndex, pageLength,
+ indexName(columnNameToSort), isAscendingStr.equalsIgnoreCase("asc")))
+ }
+
+ // Filters task list based on search parameter
+ def filterTaskList(
+ taskDataList: Seq[TaskData],
+ searchValue: String): Seq[TaskData] = {
--- End diff --
change to an Option and you can just do a map on it to handle the case it
might be empty
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]