holdenk commented on a change in pull request #28038: [SPARK-31208][CORE] Add
an expiremental cleanShuffleDependencies
URL: https://github.com/apache/spark/pull/28038#discussion_r402528402
##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1700,6 +1700,39 @@ abstract class RDD[T: ClassTag](
}
}
+ /**
+ * :: Experimental ::
+ * Marks an RDD's shuffles and it's ancestors non-persisted ancestors as no
longer needed.
+ * This cleans up shuffle files aggressively to allow nodes to be terminated.
+ * If you are uncertain of what you are doing please do not use this feature.
+ * Additional techniques for mitigating orphaned shuffle files:
+ * * Tuning the driver GC to be more aggressive so the regular context
cleaner is triggered
+ * * Setting an appropriate TTL for shuffle files to be auto cleaned
+ */
+ @Experimental
+ @DeveloperApi
+ @Since("3.1.0")
+ def cleanShuffleDependencies(blocking: Boolean = false): Unit = {
+ sc.cleaner.foreach { cleaner =>
+ /**
+ * Clean the shuffles & all of its parents.
+ */
+ def cleanEagerly(dep: Dependency[_]): Unit = {
+ if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) {
+ val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _,
_]].shuffleId
+ cleaner.doCleanupShuffle(shuffleId, blocking)
Review comment:
Yes. Talking with @dbtsai he wanted to add a lock on the blocks inside of
`doCleanupShuffle`, but given that the only price is duplicated messages to the
executors I'm not sure its worth the overhead of keeping track of that many
locks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]