attilapiros commented on a change in pull request #23614:
[SPARK-26689][CORE]Support blacklisting bad disk directory and retry in
DiskBlockManager
URL: https://github.com/apache/spark/pull/23614#discussion_r253458901
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -48,32 +56,69 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new
Array[File](subDirsPerLocalDir))
+ private[spark] val badDirs = ArrayBuffer[File]()
+ private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long]
+ // Filename hash to dirId, it should be small enough to put into memory
+ private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int,
Int].asScala
+
private val shutdownHook = addShutdownHook()
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
+ var mostRecentFailure: Exception = null
// Figure out which local directory it hashes to, and which subdirectory
in that
val hash = Utils.nonNegativeHash(filename)
- val dirId = hash % localDirs.length
+ val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length)
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
+ // Update blacklist
+ val now = clock.getTimeMillis()
+ val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_))
+ unblacklisted.foreach { dir =>
+ badDirs -= dir
+ dirToBlacklistExpiryTime.remove(dir)
+ }
+
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
+ assert(!migratedDirIdIndex.contains(dirId))
+ var succeed = false
Review comment:
But when IOException occurs you have to set newDir to null in the catch.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]