squito commented on a change in pull request #23614: [SPARK-26689][CORE]Support 
blacklisting bad disk directory and retry in DiskBlockManager
URL: https://github.com/apache/spark/pull/23614#discussion_r253126783
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
 ##########
 @@ -48,32 +55,71 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
+  private[spark] val badDirs = ArrayBuffer[File]()
+  private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long]
+  // Filename hash to dirId, it should be small enough to put into memory
+  private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int, 
Int].asScala
+
   private val shutdownHook = addShutdownHook()
 
   /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
   // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
   def getFile(filename: String): File = {
+    var mostRecentFailure: Exception = null
     // Figure out which local directory it hashes to, and which subdirectory 
in that
     val hash = Utils.nonNegativeHash(filename)
-    val dirId = hash % localDirs.length
+    val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length)
     val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
 
     // Create the subdirectory if it doesn't already exist
     val subDir = subDirs(dirId).synchronized {
+      // Update blacklist
+      val now = clock.getTimeMillis()
+      val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_))
+      unblacklisted.foreach { dir =>
+        badDirs -= dir
+        dirToBlacklistExpiryTime.remove(dir)
+      }
+
       val old = subDirs(dirId)(subDirId)
       if (old != null) {
         old
       } else {
-        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
-        if (!newDir.exists() && !newDir.mkdir()) {
-          throw new IOException(s"Failed to create local dir in $newDir.")
+        assert(!migratedDirIdIndex.contains(dirId))
 
 Review comment:
   why does `migratedDirIdIndex` use the raw hash (0 to Int.MAX) instead of 
using the index (0 to localDirs.length)?
   This check is against the index, so its inconsistent w/ the rest.  But index 
seems to make more sense to me in general.  Most importantly, the size will 
stay bounded -- otherwise it seems that map could go grow quite large, and 
never gets cleaned up.  (though it does mean you won't evenly redistribute the 
load)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to