zhouyejoe commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r663589670
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
reduceIds.add(partition.reduceId);
sizes.add(partition.getLastChunkOffset());
} catch (IOException ioe) {
- logger.warn("Exception while finalizing shuffle partition {} {}
{}", msg.appId,
- msg.shuffleId, partition.reduceId, ioe);
+ logger.warn("Exception while finalizing shuffle partition {}_{} {}
{}", msg.appId,
+ msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
} finally {
partition.closeAllFiles();
- // The partition should be removed after the files are written so
that any new stream
- // for the same reduce partition will see that the data file
exists.
- partitionsIter.remove();
}
}
}
mergeStatuses = new MergeStatuses(msg.shuffleId,
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ logger.info("Finalized shuffle {} from Application {}_{}.",
+ msg.shuffleId, msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta =
+ mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
Review comment:
Updated this part to encode/decode as a Map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]