liupc commented on a change in pull request #23614: [SPARK-26689][CORE]Support
blacklisting bad disk directory and retry in DiskBlockManager
URL: https://github.com/apache/spark/pull/23614#discussion_r253260966
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -48,32 +55,71 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new
Array[File](subDirsPerLocalDir))
+ private[spark] val badDirs = ArrayBuffer[File]()
+ private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long]
+ // Filename hash to dirId, it should be small enough to put into memory
+ private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int,
Int].asScala
+
private val shutdownHook = addShutdownHook()
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
+ var mostRecentFailure: Exception = null
// Figure out which local directory it hashes to, and which subdirectory
in that
val hash = Utils.nonNegativeHash(filename)
- val dirId = hash % localDirs.length
+ val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length)
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
+ // Update blacklist
+ val now = clock.getTimeMillis()
+ val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_))
+ unblacklisted.foreach { dir =>
+ badDirs -= dir
+ dirToBlacklistExpiryTime.remove(dir)
+ }
+
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
+ assert(!migratedDirIdIndex.contains(dirId))
Review comment:
@squito Yes, that's it. And I think it should be unlikely that it might grow
very large. suppose we would wirte 10000 files, then the size of it might be
10000 * (int type entry size, ~ 10+ bytes), I think it's small enough.
Thanks for careful review.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]