otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r633726943
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -744,10 +771,12 @@ AppShufflePartitionInfo getPartitionInfo() {
*/
public static class AppShuffleId {
public final String appId;
+ public final int attemptId;
public final int shuffleId;
- AppShuffleId(String appId, int shuffleId) {
+ AppShuffleId(String appId, int attemptId, int shuffleId) {
Review comment:
I think we don't need to change this instead we can maintain another map
with appId and latest attempt information. It would be best if we keep only
latest partition infos in the `partitions` map and remove any entries with old
attempts.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
##########
@@ -30,12 +30,15 @@
*/
public class FinalizeShuffleMerge extends BlockTransferMessage {
public final String appId;
+ public final int attemptId;
Review comment:
FinalizeShuffleMessage is sent from the driver to the shuffle services.
Is it possible for the driver of a previous attempt to be alive when the driver
of a new attempt is running?
I think whenever the server receives `FinalizeShuffeMerge` it should always
assume it is from the latest attempt
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,7 +116,7 @@ public ShuffleIndexInformation load(File file) throws
IOException {
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleId appShuffleId,
int reduceId) {
- File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
+ File dataFile = getMergedShuffleDataFile(appShuffleId.appId,
appShuffleId.shuffleId, reduceId);
Review comment:
When the server receives a pushBlock message then there are 2 cases:
1. It belongs to previous attempt: in this case it needs to ignore that
message. If we don't then it will corrupt the file here.
2. It belongs to latest attempt: in this case we should continue merging
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -414,23 +411,53 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
Longs.toArray(sizes));
}
partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId,
msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo,
MergeDirectoryMeta.class);
+ if (mergeDirectoryMeta.attemptId == 0) {
+ // When attemptId is 0, there is no attemptId stored in the
ExecutorShuffleInfo.
+ // Only the first ExecutorRegister message can register the merge
dirs
+ appsPathInfo.computeIfAbsent(appId, id ->
+ new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId,
executorInfo.localDirs,
+ mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+ } else {
+ // If attemptId is not 0, there is attemptId stored in the
ExecutorShuffleInfo.
+ // The first ExecutorRegister message from the same application
attempt will
+ // register the merge dirs in Shuffle Service. Any later
ExecutorRegister message
+ // won't override the merge dirs. But it can be overridden by
ExecutorRegister
+ // message from new app attempts.
+ appsPathInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+ if (appAttemptPathsInfo != null && mergeDirectoryMeta.attemptId >
appAttemptPathsInfo.attemptId) {
+ appAttemptPathsInfo = new AppAttemptPathsInfo(appId,
mergeDirectoryMeta.attemptId,
+ executorInfo.localDirs, mergeDirectoryMeta.mergeDir,
executorInfo.subDirsPerLocalDir);
+ }
+ return appAttemptPathsInfo;
Review comment:
We are not removing entries from `partitions` here?
If we don't maintain just active partitionsInfo in the `partitions` map then
it will add complexity while saving in level db. Also these entries are not
getting removed in the finalization so that just means it means that they will
still in memory until the app is removed but they don't have any use.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -991,22 +1023,26 @@ int getNumIOExceptions() {
/**
* Wraps all the information related to the merge directory of an
application.
*/
- private static class AppPathsInfo {
+ private static class AppAttemptPathsInfo {
+ private final int attemptId;
private final String[] activeLocalDirs;
private final int subDirsPerLocalDir;
- private AppPathsInfo(
+ private AppAttemptPathsInfo(
String appId,
+ int attemptId,
Review comment:
We can avoid this by maintaining another map maintaining latest attempts
of all apps. Or if you want to include `attemptId` in this class then maybe
change the name of this class to `AppMergedMeta` or likewise and modify the
description
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]