otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r642027998
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
}
}
+ /**
+ * Get the local merged shuffle block data for the given block ID as
multiple chunks.
+ * A merged shuffle file is divided into multiple chunks according to the
index file.
+ * Instead of reading the entire file as a single block, we split it into
smaller chunks
+ * which will be memory efficient when performing certain operations.
+ */
+ def getLocalMergedBlockData(
Review comment:
Can we keep this name consistent, that is, `getMergedBlockData` with
what is in the `ShuffleBlockResolver`
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -149,6 +178,12 @@ object BlockId {
ShuffleIndexBlockId(shuffleId.toInt, mapId.toLong, reduceId.toInt)
case SHUFFLE_PUSH(shuffleId, mapIndex, reduceId) =>
ShufflePushBlockId(shuffleId.toInt, mapIndex.toInt, reduceId.toInt)
+ case SHUFFLE_MERGED(appId, shuffleId, reduceId) =>
Review comment:
Nit: Shall we make this consistent and call it `SHUFFLE_MERGED_DATA`?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +370,50 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getMergedBlockData(
Review comment:
Nit: Missing javadoc
```
/**
* This is only used for reading local merged block data. In such cases,
all chunks in the
* merged shuffle file need to be identified at once, so the
ShuffleBlockFetcherIterator
* knows how to consume local merged shuffle file as multiple chunks.
*/
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -504,7 +504,8 @@ private[spark] class BlockManager(
hostLocalDirManager = {
if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
- !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+ !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
+ Utils.isPushBasedShuffleEnabled(conf)) {
Review comment:
When pushBasedShuffle is enabled, we want the hostLocalDirManager to be
initialized. push-based shuffle works with old fetch protocol as well. Should
this be:
```
(conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) )||
Utils.isPushBasedShuffleEnabled(conf)
```
If yes, please add a UT as well to check if `hostLocalDirManager` is
initialized when just push-based shuffle is enabled. I think we have this UT
internally.
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the shuffle service doesn't have
permission to
+ * create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf):
Array[File] = {
Review comment:
In our latest version it doesn't return anything.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -149,6 +178,12 @@ object BlockId {
ShuffleIndexBlockId(shuffleId.toInt, mapId.toLong, reduceId.toInt)
case SHUFFLE_PUSH(shuffleId, mapIndex, reduceId) =>
ShufflePushBlockId(shuffleId.toInt, mapIndex.toInt, reduceId.toInt)
+ case SHUFFLE_MERGED(appId, shuffleId, reduceId) =>
+ ShuffleMergedBlockId(appId, shuffleId.toInt, reduceId.toInt)
+ case SHUFFLE_MERGED_INDEX(appId, shuffleId, reduceId) =>
+ ShuffleMergedIndexBlockId(appId, shuffleId.toInt, reduceId.toInt)
+ case SHUFFLE_MERGED_META(appId, shuffleId, reduceId) =>
+ ShuffleMergedMetaBlockId(appId, shuffleId.toInt, reduceId.toInt)
Review comment:
Same here. Are we using this anywhere?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -129,6 +155,9 @@ object BlockId {
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val SHUFFLE_PUSH = "shufflePush_([0-9]+)_([0-9]+)_([0-9]+)".r
+ val SHUFFLE_MERGED = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).data".r
+ val SHUFFLE_MERGED_INDEX =
"shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).index".r
+ val SHUFFLE_MERGED_META =
"shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).meta".r
Review comment:
Where are we using this?
##########
File path:
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -85,6 +86,39 @@ class DiskBlockManagerSuite extends SparkFunSuite with
BeforeAndAfterEach with B
assert(diskBlockManager.getAllBlocks().isEmpty)
}
+ test("find active merged shuffle directories") {
Review comment:
This is a stale UT. It's not needed
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,38 @@ private[spark] object Utils extends Logging {
dir.getCanonicalFile
}
+ /**
+ * Create a directory that is writable by the group.
+ * Grant the customized permission so the shuffle server can
+ * create subdirs/files within the merge folder.
+ */
+ def createDirWithCustomizedPermission(dirToCreate: File, permission:
String): Unit = {
+ var attempts = 0
+ val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+ var created: File = null
+ while (created == null) {
+ attempts += 1
+ if (attempts > maxAttempts) {
+ throw new IOException(
+ s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +
+ s"${maxAttempts} attempts!")
+ }
+ try {
+ val builder = new ProcessBuilder().command(
+ "mkdir", "-p", "-m770", dirToCreate.getAbsolutePath)
Review comment:
`permission` variable is not being used.
Also we should still keep the TODO here
```
* TODO: Find out why can't we create a dir using java api with permission
770
* Files.createDirectories(mergeDir.toPath,
PosixFilePermissions.asFileAttribute(
* PosixFilePermissions.fromString("rwxrwx---"))
```
Also I think we have generalized this createDir method but for more
restrictive permissions, the java api would work.
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the shuffle service doesn't have
permission to
+ * create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf):
Array[File] = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under any local dir.
+ for (rootDir <- configuredLocalDirs) {
+ val mergeDir = new File(rootDir, mergeDirName)
+ if (mergeDir.exists()) {
+ logDebug(s"Not creating $mergeDir as it already exists")
+ return Array.empty[File]
+ }
+ }
+ for (rootDir <- configuredLocalDirs) {
+ val mergeDir = new File(rootDir, mergeDirName)
+ if (!mergeDir.exists()) {
+ logDebug(s"Creating $mergeDir as it does not exist")
+ // This executor didn't see merge_manager in the local dir, it will
start creating them.
+ // It's possible that the other executors launched at the same time
may also reach here
+ // but we are working on the assumption that the executors launched
around the same time
+ // will have the same set of application local directories.
+ try {
+ for (dirNum <- 0 until subDirsPerLocalDir) {
+ // Only one container will create this directory. The filesystem
will handle any race
+ // conditions.
+ val sudDir = new File(mergeDir, "%02x".format(dirNum))
Review comment:
Should we not check the existence of the subdir before trying to create
it?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
}
}
+ /**
+ * Get the local merged shuffle block data for the given block ID as
multiple chunks.
+ * A merged shuffle file is divided into multiple chunks according to the
index file.
+ * Instead of reading the entire file as a single block, we split it into
smaller chunks
+ * which will be memory efficient when performing certain operations.
+ */
+ def getLocalMergedBlockData(
+ blockId: ShuffleBlockId,
+ dirs: Array[String]): Seq[ManagedBuffer] = {
+ shuffleManager.shuffleBlockResolver.getMergedBlockData(blockId, Some(dirs))
+ }
+
+ /**
+ * Get the local merged shuffle block meta data for the given block ID.
+ */
+ def geLocalMergedBlockMeta(
Review comment:
Same here, `getMergedBlockMeta`?
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the shuffle service doesn't have
permission to
+ * create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf):
Array[File] = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under any local dir.
+ for (rootDir <- configuredLocalDirs) {
+ val mergeDir = new File(rootDir, mergeDirName)
+ if (mergeDir.exists()) {
+ logDebug(s"Not creating $mergeDir as it already exists")
+ return Array.empty[File]
+ }
+ }
+ for (rootDir <- configuredLocalDirs) {
+ val mergeDir = new File(rootDir, mergeDirName)
+ if (!mergeDir.exists()) {
+ logDebug(s"Creating $mergeDir as it does not exist")
Review comment:
logDebug(s"Creating $mergeDir and its subdirs since the merge dir does
not exist")
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +59,17 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new
Array[File](subDirsPerLocalDir))
+ // Get merge directory name, append attemptId if there is any
+ private val mergeDirName =
+ conf.getOption("spark.app.attempt.id")
+ .map(id => MERGE_DIRECTORY + "_" + id).getOrElse(MERGE_DIRECTORY)
+
+ /**
+ * Create merge directories
+ */
+ private[spark] val activeMergedShuffleDirs: Array[File] =
+ createLocalDirsForMergedShuffleBlocks(conf)
Review comment:
Can you please check the signature of this method again? We don't need
to keep the dirs in `DiskBlockManager` instance memory. We don't even use the
`activeMergedShuffleDirs` anywhere in this class. This still looks like stale
code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]