otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r615456879
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int,
reduceId: Int) exte
override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex +
"_" + reduceId
}
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int)
extends BlockId {
+ override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_"
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+ appId: String,
+ shuffleId: Int,
+ reduceId: Int) extends BlockId {
+ override def name: String =
+ "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
Review comment:
This is not going to be quite simple . If we attach UUID.randomUUID to
merge dir path, then there are 2 major issues with that:
1. We want a single merge directory under an application location dir. When
a new executor starts, it checks whether the merge_manager exists. It only
creates it if the merge_manager doesn't exist. If there is a random UUID
attached, then how to let the new executors know when **not** to create the new
directory? For blockMgr dirs this is not a requirement and each executor
creates its own directory.
2. How does the server know the exact name of merge_manager directory which
has a random UUID? The server currently constructs out the merge_manager path
from the `local_dirs` in `ExecutorShuffleInfo`. It actually finds the parent
dir of the block manager `local_dirs` and then assumes that `merge_manager`
exists under it. The code is here:
https://github.com/apache/spark/blob/12abfe79173f6ab00b3341f3b31cad5aa26aa6e4/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L1003
This would require sending the merge_manager directory name to the server as
well. This requires updating `RegisterExecutor` message but since we can't
change an existing message, we will have to create a new one.
I still think we can ignore this edge case because if the merged files are
lost there will be a fallback to original blocks. There are 2 cases:
1. Server hasn't finalized a shuffle yet and merged folder is deleted. When
the shuffle is finalized, since the files are not there, the partition will not
be considered merged and so original shuffle blocks will be fetched.
2. Server has finalized and merged folder is deleted after that. In this
case, during fetch of the merged partition by client, the server will respond
with failure. This will cause the client to fallback on the original un-merged
blocks.
@zhouyejoe @mridulm Let me know what you think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]