Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22341#discussion_r216257678
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -646,7 +647,47 @@ private[spark] class AppStatusListener(
}
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
- liveRDDs.remove(event.rddId)
+ liveRDDs.remove(event.rddId).foreach { liveRDD =>
+ val executorsToUpdate = new HashSet[LiveExecutor]()
+ val storageLevel = liveRDD.info.storageLevel
+ val distributions = liveRDD.getDistributions()
+
+ // Use RDD distribution to update executor memory and disk usage
info.
+ distributions.foreach { case (executorId, rddDist) =>
+ val maybeExec = liveExecutors.get(executorId)
+
+ maybeExec.foreach { exec =>
+ if (exec.hasMemoryInfo) {
+ if (storageLevel.useOffHeap) {
+ exec.usedOffHeap = math.max(0, exec.usedOffHeap -
rddDist.offHeapUsed)
+ } else {
+ exec.usedOnHeap = math.max(0, exec.usedOnHeap -
rddDist.onHeapUsed)
+ }
+ }
+ exec.memoryUsed = math.max(0, exec.memoryUsed -
rddDist.memoryUsed)
+ exec.diskUsed = math.max(0, exec.diskUsed - rddDist.diskUsed)
+ executorsToUpdate += exec
+ }
+ }
+
+ // Use RDD partition info to update executor block info.
+ val partitions = liveRDD.getPartitions()
+
+ partitions.foreach { case (_, part) =>
+ val executors = part.executors
--- End diff --
No, a partition can exist on more than one executor.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]