zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r637462835
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -414,23 +411,53 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
Longs.toArray(sizes));
}
partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId,
msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo,
MergeDirectoryMeta.class);
+ if (mergeDirectoryMeta.attemptId == 0) {
+ // When attemptId is 0, there is no attemptId stored in the
ExecutorShuffleInfo.
+ // Only the first ExecutorRegister message can register the merge
dirs
+ appsPathInfo.computeIfAbsent(appId, id ->
+ new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId,
executorInfo.localDirs,
+ mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+ } else {
+ // If attemptId is not 0, there is attemptId stored in the
ExecutorShuffleInfo.
+ // The first ExecutorRegister message from the same application
attempt will
+ // register the merge dirs in Shuffle Service. Any later
ExecutorRegister message
+ // won't override the merge dirs. But it can be overridden by
ExecutorRegister
+ // message from new app attempts.
+ appsPathInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+ if (appAttemptPathsInfo != null && mergeDirectoryMeta.attemptId >
appAttemptPathsInfo.attemptId) {
+ appAttemptPathsInfo = new AppAttemptPathsInfo(appId,
mergeDirectoryMeta.attemptId,
+ executorInfo.localDirs, mergeDirectoryMeta.mergeDir,
executorInfo.subDirsPerLocalDir);
+ }
+ return appAttemptPathsInfo;
Review comment:
Added the remove part
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]