Ngone51 commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r648261185
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,82 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the external shuffle service
doesn't have
+ * permission to create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(): Unit = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under the local dir.
+ Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+ try {
+ val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+ if (!mergeDir.exists()) {
+ // This executor does not find merge_manager directory, it will
try to create
+ // the merge_manager directory and the sub directories.
+ logDebug(s"Try to create $mergeDir and its sub dirs since the " +
+ s"merge_manager dir does not exist")
Review comment:
nit: use `MERGE_MANAGER_DIR`
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int,
reduceId: Int) exte
override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex +
"_" + reduceId
}
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedDataBlockId(appId: String, shuffleId: Int, reduceId:
Int) extends BlockId {
+ override def name: String = "shuffleMerged_" + appId + "_" + shuffleId + "_"
+ reduceId + ".data"
+}
+
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+ appId: String,
+ shuffleId: Int,
+ reduceId: Int) extends BlockId {
+ override def name: String =
+ "shuffleMerged_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
Review comment:
Shall we reuse `RemoteBlockPushResolver.MERGED_SHUFFLE_FILE_NAME_PREFIX`?
(same for `ShuffleMergedMetaBlockId`, `ShuffleMergedDataBlockId`)
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
##########
@@ -161,4 +166,78 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite
with BeforeAndAfterEa
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
assert(resolver.getMigrationBlocks(ShuffleBlockInfo(Int.MaxValue,
Long.MaxValue)).isEmpty)
}
+
+ test("getMergedBlockData should return expected FileSegmentManagedBuffer
list") {
+ val shuffleId = 1
+ val reduceId = 1
+ val dataFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.data"
+ val dataFile = new File(tempDir.getAbsolutePath, dataFileName)
+ val out = new FileOutputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ out.write(new Array[Byte](30))
+ } {
+ out.close()
+ }
+ val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+ prepareMergedShuffleIndexFile(indexFileName)
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val dirs = Some(Array[String](tempDir.getAbsolutePath))
+ val managedBufferList =
+ resolver.getMergedBlockData(ShuffleBlockId(shuffleId, -1, reduceId),
dirs)
Review comment:
nit: use `SHUFFLE_PUSH_MAP_ID` instead of `-1`?
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
##########
@@ -161,4 +166,78 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite
with BeforeAndAfterEa
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
assert(resolver.getMigrationBlocks(ShuffleBlockInfo(Int.MaxValue,
Long.MaxValue)).isEmpty)
}
+
+ test("getMergedBlockData should return expected FileSegmentManagedBuffer
list") {
+ val shuffleId = 1
+ val reduceId = 1
+ val dataFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.data"
+ val dataFile = new File(tempDir.getAbsolutePath, dataFileName)
+ val out = new FileOutputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ out.write(new Array[Byte](30))
+ } {
+ out.close()
+ }
+ val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+ prepareMergedShuffleIndexFile(indexFileName)
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val dirs = Some(Array[String](tempDir.getAbsolutePath))
+ val managedBufferList =
+ resolver.getMergedBlockData(ShuffleBlockId(shuffleId, -1, reduceId),
dirs)
+ assert(managedBufferList.size === 3)
+ assert(managedBufferList(0).size === 10)
+ assert(managedBufferList(1).size === 0)
+ assert(managedBufferList(2).size === 20)
+ }
+
+ test("getMergedBlockMeta should return expected MergedBlockMeta") {
+ val shuffleId = 1
+ val reduceId = 1
+ val metaFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.meta"
+ val metaFile = new File(tempDir.getAbsolutePath, metaFileName)
+ val chunkTracker = new RoaringBitmap()
+ chunkTracker.add(1)
+ chunkTracker.add(2)
+ val metaFileOutputStream = new FileOutputStream(metaFile)
+ val outMeta = new DataOutputStream(metaFileOutputStream)
+ Utils.tryWithSafeFinally {
+ chunkTracker.serialize(outMeta)
+ chunkTracker.clear()
+ chunkTracker.add(3)
+ chunkTracker.add(4)
+ chunkTracker.serialize(outMeta)
+ chunkTracker.clear()
+ chunkTracker.add(5)
+ chunkTracker.add(6)
+ chunkTracker.serialize(outMeta)
+ }{
+ outMeta.close()
+ }
+ val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+ prepareMergedShuffleIndexFile(indexFileName)
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val dirs = Some(Array[String](tempDir.getAbsolutePath))
+ val mergedBlockMeta =
+ resolver.getMergedBlockMeta(ShuffleBlockId(shuffleId, -1, reduceId),
dirs)
+ assert(mergedBlockMeta.getNumChunks === 3)
+ assert(mergedBlockMeta.readChunkBitmaps().size === 3)
+ assert(mergedBlockMeta.readChunkBitmaps()(0).contains(1))
+ assert(mergedBlockMeta.readChunkBitmaps()(0).contains(2))
Review comment:
Shall we check other two chunks?
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,82 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the external shuffle service
doesn't have
+ * permission to create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(): Unit = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under the local dir.
+ Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+ try {
+ val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+ if (!mergeDir.exists()) {
+ // This executor does not find merge_manager directory, it will
try to create
+ // the merge_manager directory and the sub directories.
+ logDebug(s"Try to create $mergeDir and its sub dirs since the " +
+ s"merge_manager dir does not exist")
+ for (dirNum <- 0 until subDirsPerLocalDir) {
+ val subDir = new File(mergeDir, "%02x".format(dirNum))
+ if (!subDir.exists()) {
+ // Only one container will create this directory. The
filesystem will handle
+ // any race conditions.
+ createDirWithCustomizedPermission(subDir, "770")
+ }
+ }
+ }
+ logInfo(s"Merge directory and its sub dirs get created at $mergeDir")
+ } catch {
+ case e: IOException =>
+ logError(
+ s"Failed to create merge dir in $rootDir. Ignoring this
directory.", e)
Review comment:
nit: "merge" -> "merge_manager"
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala
##########
@@ -133,4 +133,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite
with Matchers with Loca
assert(remoteBytesRead.sum === 0 && remoteBlocksFetched.sum === 0)
}
}
+
+ test("Enable host local shuffle reading when Push based shuffle is enabled")
{
Review comment:
nit: "Push" -> "push"
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
##########
@@ -161,4 +166,78 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite
with BeforeAndAfterEa
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
assert(resolver.getMigrationBlocks(ShuffleBlockInfo(Int.MaxValue,
Long.MaxValue)).isEmpty)
}
+
+ test("getMergedBlockData should return expected FileSegmentManagedBuffer
list") {
+ val shuffleId = 1
+ val reduceId = 1
+ val dataFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.data"
+ val dataFile = new File(tempDir.getAbsolutePath, dataFileName)
+ val out = new FileOutputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ out.write(new Array[Byte](30))
+ } {
+ out.close()
+ }
+ val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+ prepareMergedShuffleIndexFile(indexFileName)
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val dirs = Some(Array[String](tempDir.getAbsolutePath))
+ val managedBufferList =
+ resolver.getMergedBlockData(ShuffleBlockId(shuffleId, -1, reduceId),
dirs)
+ assert(managedBufferList.size === 3)
+ assert(managedBufferList(0).size === 10)
+ assert(managedBufferList(1).size === 0)
+ assert(managedBufferList(2).size === 20)
+ }
+
+ test("getMergedBlockMeta should return expected MergedBlockMeta") {
+ val shuffleId = 1
+ val reduceId = 1
+ val metaFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.meta"
+ val metaFile = new File(tempDir.getAbsolutePath, metaFileName)
+ val chunkTracker = new RoaringBitmap()
+ chunkTracker.add(1)
+ chunkTracker.add(2)
+ val metaFileOutputStream = new FileOutputStream(metaFile)
+ val outMeta = new DataOutputStream(metaFileOutputStream)
+ Utils.tryWithSafeFinally {
+ chunkTracker.serialize(outMeta)
+ chunkTracker.clear()
+ chunkTracker.add(3)
+ chunkTracker.add(4)
+ chunkTracker.serialize(outMeta)
+ chunkTracker.clear()
+ chunkTracker.add(5)
+ chunkTracker.add(6)
+ chunkTracker.serialize(outMeta)
+ }{
+ outMeta.close()
+ }
+ val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+ prepareMergedShuffleIndexFile(indexFileName)
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val dirs = Some(Array[String](tempDir.getAbsolutePath))
+ val mergedBlockMeta =
+ resolver.getMergedBlockMeta(ShuffleBlockId(shuffleId, -1, reduceId),
dirs)
Review comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]