yaooqinn commented on a change in pull request #26643: [WIP][SPARK-29998][CORE]
Retry getFile() until all folder failed then exit
URL: https://github.com/apache/spark/pull/26643#discussion_r349925722
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -59,27 +59,45 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
def getFile(filename: String): File = {
- // Figure out which local directory it hashes to, and which subdirectory
in that
val hash = Utils.nonNegativeHash(filename)
- val dirId = hash % localDirs.length
- val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
-
- // Create the subdirectory if it doesn't already exist
- val subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
+ var count = 0
+ val localDirIndex =
scala.collection.mutable.ArrayBuffer(localDirs.indices: _*)
+ var subDir: Option[File] = None
+ while (subDir.isEmpty && count < localDirs.length) {
+ // Figure out which local directory it hashes to, and which subdirectory
in that
+ val hashIndex = hash % localDirIndex.size
+ val dirId = localDirIndex(hashIndex)
+ val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
+ // Create the subdirectory if it doesn't already exist
+ subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ subDir = Some(old)
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ try {
+ if (!newDir.exists()) {
+ // If parent dir not exist, create it
+ newDir.mkdirs()
+ }
+ subDirs(dirId)(subDirId) = newDir
+ subDir = Some(newDir)
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local dir in $newDir.", e)
+ count = count + 1
+ localDirIndex.remove(hashIndex)
Review comment:
the `dirId` seems unstable, I guess for external shuffle probably goes wrong
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]