Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20481#discussion_r165510869
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging {
db
}
- /** Turns a KVStoreView into a Scala sequence, applying a filter. */
- def viewToSeq[T](
- view: KVStoreView[T],
- max: Int)
- (filter: T => Boolean): Seq[T] = {
+ /**
+ * Turns a KVStoreView into a Scala sequence, applying a filter, sorting
the sequence and
+ * selecting the first `max` values.
+ */
+ def viewToSeq[T, S: Ordering](
+ view: KVStoreView[T],
+ max: Int)
+ (filter: T => Boolean)(sorter: T => S): Seq[T] = {
val iter = view.closeableIterator()
try {
- iter.asScala.filter(filter).take(max).toList
+ iter.asScala.filter(filter).toList.sortBy(sorter).take(max)
--- End diff --
So, aside from the two closure parameters making the calls super ugly, this
is more expensive than the previous version.
Previously:
- filter as you iterate over view
- limit iteration
- materialize "max" elements
Now:
- filter as you iterate over view
- materialize all elements that pass the filter
- sort and take "max" elements
This will, at least, make replaying large apps a lot slower, given the
filter in the task cleanup method.
```
// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.status != TaskState.RUNNING.toString()
}
```
So, when replaying, every time you need to do a cleanup of tasks, you'll
deserialize *all* tasks for the stage. If you have a stage with 10s of
thousands of tasks, that's super expensive.
If all you want to change here is the sorting of jobs, I'd recommend adding
a new index to `JobDataWrapper` that sorts them by end time. Then you can do
the sorting before you even call this method, by setting up the `view`
appropriately.
If you also want to sort the others (stages, tasks, and sql executions),
you could also create indices for those.
Or you could find a way to do this that is not so expensive on the replay
side...
If adding indices, though, I'd probably try to get this into 2.3.0 since it
would change the data written to disk.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]