otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r615456879



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,
+  shuffleId: Int,
+  reduceId: Int) extends BlockId {
+  override def name: String =
+    "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"

Review comment:
       This is not going to be quite simple . If we attach UUID.randomUUID to 
merge dir path, then there are 2 major issues with that:
   1. We want a single merge directory under an application location dir. When 
a new executor starts, it checks whether the merge_manager exists. It only 
creates it if the merge_manager doesn't exist. If there is a random UUID 
attached, then how to let the new executors know when **not** to create the new 
directory? For blockMgr dirs this is not a requirement and each executor 
creates its own directory.
   
   2. How does the server know the exact name of merge_manager directory which 
has a random UUID? The server currently constructs out the merge_manager path 
from the `local_dirs` in `ExecutorShuffleInfo`. It actually finds the parent 
dir of the block manager `local_dirs` and then assumes that `merge_manager` 
exists under it. The code is here:
   
https://github.com/apache/spark/blob/12abfe79173f6ab00b3341f3b31cad5aa26aa6e4/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L1003
   This would require sending the merge_manager directory name to the server as 
well. This requires updating `RegisterExecutor` message but since we can't 
change an existing message, we will have to create a new one.
   
   I still think we can ignore this edge case because if the merged files are 
lost there will be a fallback to original blocks. There are 2 cases:
   1. Server hasn't finalized a shuffle yet and merged folder is deleted. When 
the shuffle is finalized, since the files are not there, the partition will not 
be considered merged and so original shuffle blocks will be fetched.
   2. Server has finalized and merged folder is deleted after that. In this 
case, during fetch of the merged partition by client, the server will respond 
with failure. This will cause the client to fallback on the original un-merged 
blocks.
   
   @zhouyejoe @mridulm Let me know what you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to