Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21688#discussion_r219303058
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala ---
    @@ -102,4 +124,87 @@ private[v1] class StagesResource extends 
BaseAppResource {
         withUI(_.store.taskList(stageId, stageAttemptId, offset, length, 
sortBy))
       }
     
    +  @GET
    +  @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskTable")
    +  def taskTable(
    +    @PathParam("stageId") stageId: Int,
    +    @PathParam("stageAttemptId") stageAttemptId: Int,
    +    @QueryParam("details") @DefaultValue("true") details: Boolean, 
@Context uriInfo: UriInfo):
    +  util.HashMap[String, Object] = {
    +    withUI { ui =>
    +      val uriQueryParameters = uriInfo.getQueryParameters(true)
    +      val totalRecords = uriQueryParameters.getFirst("numTasks")
    +      var isSearch = false
    +      var searchValue: String = null
    +      var _tasksToShow: Seq[TaskData] = null
    +      if (uriQueryParameters.getFirst("search[value]") != null &&
    +        uriQueryParameters.getFirst("search[value]").length > 0) {
    +        _tasksToShow = ui.store.taskList(stageId, stageAttemptId, 0, 
totalRecords.toInt,
    +          indexName("Index"), true)
    +        isSearch = true
    +        searchValue = uriQueryParameters.getFirst("search[value]")
    +      } else {
    +        _tasksToShow = doPagination(uriQueryParameters, stageId, 
stageAttemptId)
    +      }
    +      if (_tasksToShow.nonEmpty) {
    +        val iterator = _tasksToShow.iterator
    +        while(iterator.hasNext) {
    +          val t1: TaskData = iterator.next()
    +          val execId = t1.executorId
    +          val executorLogs = ui.store.executorSummary(execId).executorLogs
    +          t1.executorLogs = executorLogs
    +          t1.schedulerDelay = AppStatusUtils.schedulerDelay(t1)
    +          t1.gettingResultTime = AppStatusUtils.gettingResultTime(t1)
    +        }
    +        val ret = new util.HashMap[String, Object]()
    +        // Performs server-side search based on input from user
    +        if (isSearch) {
    +          val filteredTaskList = ui.store.filterTaskList(_tasksToShow, 
searchValue)
    +          if (filteredTaskList.length > 0) {
    +            ret.put("aaData", filteredTaskList)
    +          } else {
    +            _tasksToShow = doPagination(uriQueryParameters, stageId, 
stageAttemptId)
    --- End diff --
    
    we may also want to look at pushing the search functionality down into the 
store itself.  There is a bunch of optimization in keys and such to do the 
sorting and try to keep from filling memory, we should investigate if we can 
push it down so at least with the history server where it uses levelDB we don't 
have to pull everything into memory.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to