Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22341#discussion_r216512185
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -646,7 +647,47 @@ private[spark] class AppStatusListener(
}
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
- liveRDDs.remove(event.rddId)
+ liveRDDs.remove(event.rddId).foreach { liveRDD =>
+ val executorsToUpdate = new HashSet[LiveExecutor]()
+ val storageLevel = liveRDD.info.storageLevel
+ val distributions = liveRDD.getDistributions()
+
+ // Use RDD distribution to update executor memory and disk usage
info.
+ distributions.foreach { case (executorId, rddDist) =>
+ val maybeExec = liveExecutors.get(executorId)
--- End diff --
Yeah, right.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]