otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r644834562
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
dir.getCanonicalFile
}
+ /**
+ * Create a directory that is writable by the group.
+ * Grant the customized permission so the shuffle server can
+ * create subdirs/files within the merge folder.
Review comment:
@Ngone51 @zhouyejoe How about moving this function to DiskBlockManager
completely? For more restrictive permissions than `750` we can use java api.
Here, we are trying to run the command `mkdir` just because we want to create
mergeDirectory with `770` permission.
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2566,11 +2601,28 @@ private[spark] object Utils extends Logging {
}
/**
- * Push based shuffle can only be enabled when external shuffle service is
enabled.
+ * Push based shuffle can only be enabled when the application is submitted
+ * to run in YARN mode, with external shuffle service enabled and
+ * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to
1.
+ * TODO: SPARK-35546 Support push based shuffle with multiple app attempts
*/
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
- (conf.get(IS_TESTING).getOrElse(false) ||
conf.get(SHUFFLE_SERVICE_ENABLED))
+ (conf.get(IS_TESTING).getOrElse(false) ||
+ (conf.get(SHUFFLE_SERVICE_ENABLED) &&
+ conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") &&
Review comment:
Nit: is the String "yarn" declared as a constant anywhere else that we
can use?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -504,7 +504,8 @@ private[spark] class BlockManager(
hostLocalDirManager = {
if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
- !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+ !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
+ Utils.isPushBasedShuffleEnabled(conf)) {
Review comment:
This still looks the same @zhouyejoe. `hostLocalDirManager` should be
set when push-based shuffle is enable. Can you put `()` around
`conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)` so that it is
clear.
It should be:
```
if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
Utils.isPushBasedShuffleEnabled(conf))
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int,
reduceId: Int) exte
override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex +
"_" + reduceId
}
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int)
extends BlockId {
Review comment:
nit: Change it to `ShuffleMergedDataBlockId`
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,45 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the shuffle service doesn't have
permission to
+ * create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under any local dir.
+ Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+ try {
+ val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+ // This executor does not find merge_manager directory, it will
start creating them.
+ // It's possible that the other executors launched at the same time
may also reach here
+ // but we are working on the assumption that the executors launched
around the same time
+ // will have the same set of application local directories.
Review comment:
This comment needs to be updated as well. We are no longer working with
this assumption.
`but we are working on the assumption that the executors launched around the
same time
// will have the same set of application local directories.`
Also move it inside the `if` block below.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
}
}
+ /**
+ * Get the local merged shuffle block data for the given block ID as
multiple chunks.
+ * A merged shuffle file is divided into multiple chunks according to the
index file.
+ * Instead of reading the entire file as a single block, we split it into
smaller chunks
+ * which will be memory efficient when performing certain operations.
+ */
+ def getLocalMergedBlockData(
Review comment:
ok
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -139,7 +171,7 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
- private def createLocalDirs(conf: SparkConf): Array[File] = {
+ private def createLocalDirs(): Array[File] = {
Review comment:
Nit: is this change really required? This is original code so why not
just leave it as it is?
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -40,7 +42,7 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
/* Create one local directory for each path mentioned in spark.local.dir;
then, inside this
* directory, create multiple subdirectories that we will hash files into,
in order to avoid
* having really large inodes at the top level. */
- private[spark] val localDirs: Array[File] = createLocalDirs(conf)
+ private[spark] val localDirs: Array[File] = createLocalDirs()
Review comment:
Nit: this change seems unrelated to push-based shuffle so should we not
make it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]