Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r159990658
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
    @@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
         if (details) stageWithDetails(stage) else stage
       }
     
    +  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
    +    store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
    +  }
    +
    +  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
    +    store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
    +  }
    +
    +  /**
    +   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
    +   * requested quantiles for the recorded metrics.
    +   *
    +   * This method can be expensive if the requested quantiles are not 
cached; the method
    +   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
    +   * those to avoid expensive scans of all task data.
    +   */
       def taskSummary(
           stageId: Int,
           stageAttemptId: Int,
    -      quantiles: Array[Double]): v1.TaskMetricDistributions = {
    -
    -    val stage = Array(stageId, stageAttemptId)
    -
    -    val rawMetrics = store.view(classOf[TaskDataWrapper])
    -      .index("stage")
    -      .first(stage)
    -      .last(stage)
    -      .asScala
    -      .flatMap(_.info.taskMetrics)
    -      .toList
    -      .view
    -
    -    def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
    -      Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
    -
    -    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
    -    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
    -    // (c) shove the distribution into the right field in our return type 
and (d) only return
    -    // a result if the option is defined for any of the tasks.  
MetricHelper is a little util
    -    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
    -    // implement one "build" method, which just builds the quantiles for 
each field.
    -
    -    val inputMetrics =
    -      new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
    -
    -        def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
    -          bytesRead = submetricQuantiles(_.bytesRead),
    -          recordsRead = submetricQuantiles(_.recordsRead)
    -        )
    -      }.build
    -
    -    val outputMetrics =
    -      new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
    -
    -        def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
    -          bytesWritten = submetricQuantiles(_.bytesWritten),
    -          recordsWritten = submetricQuantiles(_.recordsWritten)
    -        )
    -      }.build
    -
    -    val shuffleReadMetrics =
    -      new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
    -          raw.shuffleReadMetrics
    -
    -        def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
    -          readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
    -          readRecords = submetricQuantiles(_.recordsRead),
    -          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
    -          remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
    -          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
    -          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
    -          totalBlocksFetched = submetricQuantiles { s =>
    -            s.localBlocksFetched + s.remoteBlocksFetched
    -          },
    -          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
    -        )
    -      }.build
    -
    -    val shuffleWriteMetrics =
    -      new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
    -          raw.shuffleWriteMetrics
    -
    -        def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
    -          writeBytes = submetricQuantiles(_.bytesWritten),
    -          writeRecords = submetricQuantiles(_.recordsWritten),
    -          writeTime = submetricQuantiles(_.writeTime)
    -        )
    -      }.build
    -
    -    new v1.TaskMetricDistributions(
    +      unsortedQuantiles: Array[Double]): 
Option[v1.TaskMetricDistributions] = {
    +    val stageKey = Array(stageId, stageAttemptId)
    +    val quantiles = unsortedQuantiles.sorted
    +
    +    // We don't know how many tasks remain in the store that actually have 
metrics. So scan one
    +    // metric and count how many valid tasks there are. Use skip() instead 
of next() since it's
    +    // cheaper for disk stores (avoids deserialization).
    +    val count = {
    +      Utils.tryWithResource(
    +        store.view(classOf[TaskDataWrapper])
    +          .parent(stageKey)
    +          .index(TaskIndexNames.EXEC_RUN_TIME)
    +          .first(0L)
    +          .closeableIterator()
    +      ) { it =>
    +        var _count = 0L
    +        while (it.hasNext()) {
    +          _count += 1
    +          it.skip(1)
    +        }
    +        _count
    +      }
    +    }
    +
    +    if (count <= 0) {
    +      return None
    +    }
    +
    +    // Find out which quantiles are already cached. The data in the store 
must match the expected
    +    // task count to be considered, otherwise it will be re-scanned and 
overwritten.
    +    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { 
q =>
    +      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
    +      asOption(store.read(classOf[CachedQuantile], 
qkey)).filter(_.taskCount == count)
    +    }
    +
    +    // If there are no missing quantiles, return the data. Otherwise, just 
compute everything
    +    // to make the code simpler.
    +    if (cachedQuantiles.size == quantiles.size) {
    +      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = {
    +        cachedQuantiles.map(fn).toIndexedSeq
    --- End diff --
    
    minor -- you can skip a copy here that comes with `toIndexedSeq`.  You 
could use a `WrappedArray` which is also an IndexedSeq.  you can just drop the 
`toIndexedSeq` and it'll happen automatically (implicit conversion in Predef), 
or you can make explicit with `WrappedArray.make()`.
    
    ```
    scala> val arr = Array(1,2,3,4)
    arr: Array[Int] = Array(1, 2, 3, 4)
    scala> val x: IndexedSeq[Int] = arr
    x: IndexedSeq[Int] = WrappedArray(1, 2, 3, 4)
    scala> x.toArray eq arr
    res9: Boolean = true
    scala> arr.toIndexedSeq.toArray eq arr
    res10: Boolean = false
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to