otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r604559381



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi

Review comment:
       Need to add `@Since("3.2.0")`
   

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi

Review comment:
       Need to add @Since("3.2.0")

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +92,37 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * 
org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.appId, mergedBlockId.name)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.appId, mergedIndexBlockId.name)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.appId, mergedMetaBlockId.name)
+      case _ =>
+        throw new RuntimeException(s"Only merged block ID is supported, but 
got ${blockId}")
+    }
+  }
+
+  private def getMergedShuffleFile(appId: String, filename: String): File = {

Review comment:
       This needs refactoring. On the server side, we have changed this to 
leverage `ExecutorDiskUtils.getFile`. I think we need to leverage that here as 
well.
   
https://github.com/apache/spark/blob/46f96e9ce1cf998f521997bef1bd7367838f0d57/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L218
   

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,
+  shuffleId: Int,
+  reduceId: Int) extends BlockId {
+  override def name: String =
+    "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
+}
+
+@DeveloperApi
+case class ShuffleMergedMetaBlockId(

Review comment:
       Need to add `@Since("3.2.0")`

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,
+  shuffleId: Int,
+  reduceId: Int) extends BlockId {
+  override def name: String =
+    "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
+}
+
+@DeveloperApi
+case class ShuffleMergedMetaBlockId(
+  appId: String,

Review comment:
       Nit: indentation

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,

Review comment:
       Nit: indentation

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in 
this directory
+   * will be created by the external shuffle services. We only create the 
merge_manager directories
+   * here because currently the shuffle service doesn't have permission to 
create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {

Review comment:
       If I remember correctly, the existing UT for this only tests for the 
parent merge_manager dir. Is it possible to extend that UT for checking the 
subdir creation?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,38 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant 770 permission so the shuffle server can create subdirs/files 
within the merge folder.
+   */
+  def createDirWith770(dirToCreate: File): Unit = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var created: File = null
+    while (created == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException(
+          s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +
+            s"${maxAttempts} attempts!")
+      }
+      try {
+        val builder = new ProcessBuilder().command(
+          "mkdir", "-m770", dirToCreate.getAbsolutePath)
+        val proc = builder.start()
+        val exitCode = proc.waitFor()
+        if (dirToCreate.exists()) {
+          created = dirToCreate
+        }
+        logDebug(
+          s"Created directory at ${dirToCreate.getAbsolutePath} and exitCode 
$exitCode")
+      } catch {
+        case e: SecurityException => created = null;
+      }
+    }
+  }
+
+

Review comment:
       Nit: extra line

##########
File path: 
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -85,6 +86,36 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(diskBlockManager.getAllBlocks().isEmpty)
   }
 
+  test("find active merged shuffle directories") {
+    testConf.set("spark.local.dir", rootDirs)
+    testConf.set("spark.shuffle.push.based.enabled", "true")
+    testConf.set("spark.shuffle.service.enabled", "true")
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    assert(diskBlockManager.activeMergedShuffleDirs.isDefined)
+    assert(diskBlockManager.activeMergedShuffleDirs.get.length == 
diskBlockManager.localDirs.length)
+    val expected = Array(rootDir0.getAbsolutePath, 
rootDir1.getAbsolutePath).sorted
+    val actual = diskBlockManager.activeMergedShuffleDirs.get.map(file => 
file.getParent)
+    assert(expected sameElements actual)
+  }
+
+  test("should not create merge directories if one already exists under a 
local dir") {
+    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR)
+    if (!mergeDir0.exists()) {
+      Files.createDirectories(mergeDir0.toPath)
+    }
+    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR)
+    if (mergeDir1.exists()) {
+      Utils.deleteRecursively(mergeDir1)
+    }
+    testConf.set("spark.local.dir", rootDirs)
+    testConf.set("spark.shuffle.push.based.enabled", "true")
+    testConf.set("spark.shuffle.service.enabled", "true")
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    assert(diskBlockManager.activeMergedShuffleDirs.isDefined)
+    assert(diskBlockManager.activeMergedShuffleDirs.get.length == 1)
+  }
+
+

Review comment:
       Nit: extra line

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -110,6 +110,22 @@ private[spark] class IndexShuffleBlockResolver(
       .getOrElse(blockManager.diskBlockManager.getFile(blockId))
   }
 
+  private def getMergedBlockDataFile(appId: String, shuffleId: Int, reduceId: 
Int): File = {
+    blockManager.diskBlockManager.getMergedShuffleFile(
+      ShuffleMergedBlockId(appId, shuffleId, reduceId))
+  }
+
+  private def getMergedBlockIndexFile(appId: String, shuffleId: Int, reduceId: 
Int): File = {
+    blockManager.diskBlockManager.getMergedShuffleFile(
+      ShuffleMergedIndexBlockId(appId, shuffleId, reduceId))
+  }
+
+  private def getMergedBlockMetaFile(appId: String, shuffleId: Int, reduceId: 
Int): File = {
+    blockManager.diskBlockManager.getMergedShuffleFile(
+      ShuffleMergedMetaBlockId(appId, shuffleId, reduceId))
+  }
+
+

Review comment:
       Nit: extra line

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in 
this directory
+   * will be created by the external shuffle services. We only create the 
merge_manager directories
+   * here because currently the shuffle service doesn't have permission to 
create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return
+        }
+      }
+      // Since this executor didn't see any merge_manager directories, it will 
start creating them.
+      // It's possible that the other executors launched at the same time may 
also reach here but
+      // we are working on the assumption that the executors launched around 
the same time will
+      // have the same set of application local directories.
+      localDirs.flatMap { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // Only one container will create this directory. The filesystem 
will handle any race
+          // conditions.
+          if (!mergeDir.exists()) {
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              val sudDir = new File(mergeDir, "%02x".format(dirNum))
+              Utils.createDirWith770(sudDir)
+            }
+          }
+          logInfo(s"Merge directory at $mergeDir")
+          Some(mergeDir)
+        } catch {
+          case e: IOException =>
+            logError(
+              s"Failed to create merge dir in $rootDir. Ignoring this 
directory.", e)
+            None
+        }
+      }
+      Utils.getConfiguredLocalDirs(conf).map(rootDir => new File(rootDir, 
MERGE_MANAGER_DIR))
+    }
+  }
+
+  private def findActiveMergedShuffleDirs(conf: SparkConf): 
Option[Array[File]] = {
+    Option(Utils.getConfiguredLocalDirs(conf).map(
+      rootDir => new File(rootDir, "merge_manager")).filter(mergeDir => 
mergeDir.exists()))
+  }
+
+

Review comment:
       Nit: extra line

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in 
this directory
+   * will be created by the external shuffle services. We only create the 
merge_manager directories

Review comment:
       Nit: change the comment also mention merge_manager + subdirs

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all 
chunks in the
+   * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] 
= {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }
+    // Number of chunks is number of indexes - 1
+    val numChunks = size / 8 - 1
+    val chunkSizes = new Array[Long](numChunks)
+    for (index <- 0 until numChunks) {
+      chunkSizes(index) = offsets.get(index + 1) - offsets.get(index)
+    }
+    chunkSizes.indices.map {
+      index =>
+        new FileSegmentManagedBuffer(transportConf, dataFile,
+          offsets.get(index), chunkSizes(index))
+    }
+  }
+
+  /**
+   * This is only used for reading local merged block meta data.
+   */
+  override def getMergedBlockMeta(blockId: ShuffleBlockId): MergedBlockMeta = {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val size = indexFile.length.toInt
+    val numChunks = (size / 8) - 1
+    val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val chunkBitMaps = new FileSegmentManagedBuffer(transportConf, metaFile, 
0L, metaFile.length)
+    new MergedBlockMeta(numChunks, chunkBitMaps)
+  }
+
+

Review comment:
       Nit: extra line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to