zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r618802072



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,
+  shuffleId: Int,
+  reduceId: Int) extends BlockId {
+  override def name: String =
+    "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"

Review comment:
       Thanks for the review and discussion on this issue. We have reached an 
agreement that the current implementation will cause issues when the 
application has multiple attempts. 
   **Issue description:**
   Currently the merge dir is created under the application local tmp dir by 
the executors. For example, for application_12345, executor1 running on Node1 
will create the merge_dir under 
/[yarn-local-dirs]/usercache/[username]/appcache/application_12345/merge_dir. 
Even though there are other executors from this application running on Node1, 
they will not create those dirs as it has been created. When executor1 register 
itself with the shuffle service on Node1, it will also register the local dirs 
and let shuffle service know where the unmerged shuffle data is stored. So in 
this case, it will register  
/[yarn-local-dirs]/usercache/[username]/appcache/application_12345/blockmgr_RandomID.
 ESS will use this dirs and assume the merge dirs have been created under 
/[yarn-local-dirs]/usercache/[username]/appcache/application_12345/merge_dir. 
For the executor register messages from the same application, the first message 
will register the merge dirs in shuffle service, others will be ignored.
  While the application finishes, we rely on Yarn NodeManager to clean up the 
whole dir /[yarn-local-dirs]/usercache/[username]/appcache/application_12345. 
   1nd issue: Since shuffle service will only use the first executor register 
message from the same application, the original dirs list from the first 
executor in the first attempt will be kept all the way during the later 
attempts run. 
   2st issue: While in the case of multiple attempts from the same application, 
this dir won’t get cleaned up. So the original merged shuffle files generated 
by shuffle service from the former attempts will still exist within the 
merge_dir. This would be stale data but shuffle service will use these stale 
data to serve the merge shuffle fetch in later attempts.
   
   **Root cause of the issue:**
   For original unmerged shuffle files, there won’t be such issues as 1. Every 
executor will register its own local dirs to shuffle service 2. Block manager 
dirs are encoded with randomIDs on the path. For merged shuffle dir, we only 
use the first ever register executor message to get the merge_dir base location 
and ignore all the later ones. But in executor register message, there is no 
attemptID encoded, so shuffle service cannot get to know whether they are from 
different attempts. Yarn will not cleanup tmp directories while the application 
is still running, even though there are multiple attempts.
   
   **Proposed solution:**
   In either proposed solution, we have to get the attemptID encoded into the 
executor register protocol. After we have the attemptID encoded and shuffle 
service can use this attemptID as the way to distinguish among multiple 
attempts.
   Solution 1: We can create a new protocol for RegisterExecutor, which adds a 
new fields: AttemptID, compared with the original RegisterExecutor protocol. 
This new protocol will be used when push based shuffle is enabled.
   Solution 2: Don’t create new protocol and just leverage the existing 
protocol. We can encode “appID” + “attemptID” as the “new appID” in the 
RegisterExecutor, let’s call it appAttemptID. Shuffle service will be able to 
tell whether the String is the original appID or appAttemptID with Regex, and 
then deal with it.
   
   CC @Ngone51 @tgravescs @attilapiros. Can you share your opinions over the 
issues and proposed solutions? Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to