mridulm commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r658525245
########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -73,27 +76,26 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); - @VisibleForTesting - static final String MERGE_MANAGER_DIR = "merge_manager"; + public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; - private final ConcurrentMap<String, AppPathsInfo> appsPathInfo; - private final ConcurrentMap<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> partitions; + // A three level map to store the merged shuffle information with ApplicationID as the key Review comment: While it boils down to being a three level map from point of view of getting to `AppShufflePartitionInfo` from here, `appsShuffleInfo` itself is just a one level map - replace comment with details of what is `appsShuffleInfo` instead ? Something along lines of All the ongoing shuffles for each application id ? ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -73,27 +76,26 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { Review comment: High level comment about changes in this class: * Query `AppShuffleInfo` only once for each public method using `validateAndGetAppShuffleInfo` (except for application removed - I have commented on that seperately) - everything else should be depending only on that returned instance. * None of the private methods should query `validateAndGetAppShuffleInfo`. * Move `getFile`, `getMergedShuffleDataFile`, `getMergedShuffleIndexFile`, `getMergedShuffleMetaFile`, `generateFileName` into `AppShuffleInfo` - and use the appShuffleInfo from (a) to fetch these values. * Add `appId` as a `final` field of `AppShuffleInfo` - and do not pass `appId` into any of the methods - simply pass appShuffleInfo. * Also, see if `appId` can be removed from `AppShufflePartitionInfo` - given it will be in the containing `AppShuffleInfo` always. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -403,8 +462,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { - logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); + logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.attemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); // The partition should be removed after the files are written so that any new stream Review comment: You can remove the `partitionsIter.remove()` below - since `shufflePartitions` has been removed from `appShuffleInfo.partitions` already. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI */ private File getFile(String appId, String filename) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File targetFile = ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs, + appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename); logger.debug("Get merged file {}", targetFile.getAbsolutePath()); return targetFile; } - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); + private File getMergedShuffleDataFile( + String appId, + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, fileName); } - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); + private File getMergedShuffleIndexFile( + String appId, + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, indexName); } - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); + private File getMergedShuffleMetaFile( + String appId, + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, metaName); } @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + String[] activeLocalDirs = + Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, + "application " + appId + " active local dirs list has not been updated " + + "by any executor registration"); return activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator = - partitions.entrySet().iterator(); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + cleanupShufflePartitionInfo(appShuffleInfo); + appsShuffleInfo.remove(appId); + if (cleanupLocalDirs) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); + directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + } Review comment: ```suggestion AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); if (null != appShuffleInfo) { cleanupShufflePartitionInfo(appShuffleInfo); if (cleanupLocalDirs) { Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) .map(dir -> Paths.get(dir)).toArray(Path[]::new); directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); } } ``` ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -338,51 +418,30 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; - final String streamId = String.format("%s_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex, - msg.reduceId); if (partitionInfo != null) { return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex); } else { // For a duplicate block or a block which is late, respond back with a callback that handles // them differently. - return new StreamCallbackWithID() { - @Override - public String getID() { - return streamId; - } - - @Override - public void onData(String streamId, ByteBuffer buf) { - // Ignore the requests. It reaches here either when a request is received after the - // shuffle file is finalized or when a request is for a duplicate block. - } - - @Override - public void onComplete(String streamId) { - if (isTooLate) { - // Throw an exception here so the block data is drained from channel and server - // responds RpcFailure to the client. - throw new RuntimeException(String.format("Block %s %s", streamId, - ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)); - } - // For duplicate block that is received before the shuffle merge finalizes, the - // server should respond success to the client. - } - - @Override - public void onFailure(String streamId, Throwable cause) { - } - }; + if (isTooLate) { + // Throw RuntimeException in client as of block arrives too late + return createCallbackForInvalidPushBlocks( + streamId, ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX); + } else { + // Duplicate block received, no error message and exception will be thrown in client. + return createCallbackForInvalidPushBlocks(streamId, null); + } } } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId); - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); - Map<Integer, AppShufflePartitionInfo> shufflePartitions = partitions.get(appShuffleId); + logger.info("Finalizing shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + Map<Integer, AppShufflePartitionInfo> shufflePartitions = + appShuffleInfo.partitions.get(msg.shuffleId); Review comment: ```suggestion Map<Integer, AppShufflePartitionInfo> shufflePartitions = appShuffleInfo.partitions.remove(msg.shuffleId); ``` The subsequent `remove` below is not needed. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI */ private File getFile(String appId, String filename) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File targetFile = ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs, + appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename); logger.debug("Get merged file {}", targetFile.getAbsolutePath()); return targetFile; } - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); + private File getMergedShuffleDataFile( + String appId, + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, fileName); } - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); + private File getMergedShuffleIndexFile( + String appId, + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, indexName); } - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); + private File getMergedShuffleMetaFile( + String appId, + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, metaName); } @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + String[] activeLocalDirs = + Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, + "application " + appId + " active local dirs list has not been updated " + + "by any executor registration"); return activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator = - partitions.entrySet().iterator(); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + cleanupShufflePartitionInfo(appShuffleInfo); + appsShuffleInfo.remove(appId); + if (cleanupLocalDirs) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); + directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + } + } + + /** + * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. + */ + private void cleanupShufflePartitionInfo(AppShuffleInfo appShuffleInfo) { + Iterator<Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>>> iterator = + appShuffleInfo.partitions.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = iterator.next(); - AppShuffleId appShuffleId = entry.getKey(); - if (appId.equals(appShuffleId.appId)) { - iterator.remove(); - for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) { + Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>> entry = iterator.next(); + for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) { + synchronized (partitionInfo) { partitionInfo.closeAllFiles(); } } - } - if (cleanupLocalDirs) { - Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs) - .map(dir -> Paths.get(dir)).toArray(Path[]::new); - directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + iterator.remove(); Review comment: You dont need to remove iterator - the entire map has been dscarded. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI */ private File getFile(String appId, String filename) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File targetFile = ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs, + appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename); logger.debug("Get merged file {}", targetFile.getAbsolutePath()); return targetFile; } - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); + private File getMergedShuffleDataFile( + String appId, + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, fileName); } - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); + private File getMergedShuffleIndexFile( + String appId, + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, indexName); } - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); + private File getMergedShuffleMetaFile( + String appId, + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, metaName); } @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + String[] activeLocalDirs = + Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, + "application " + appId + " active local dirs list has not been updated " + + "by any executor registration"); return activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator = - partitions.entrySet().iterator(); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + cleanupShufflePartitionInfo(appShuffleInfo); + appsShuffleInfo.remove(appId); + if (cleanupLocalDirs) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); + directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); Review comment: Move this into `AppPathsInfo` as a `cleanupLocalDirs()` method? ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -338,51 +418,30 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; - final String streamId = String.format("%s_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex, - msg.reduceId); if (partitionInfo != null) { return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex); } else { // For a duplicate block or a block which is late, respond back with a callback that handles // them differently. - return new StreamCallbackWithID() { - @Override - public String getID() { - return streamId; - } - - @Override - public void onData(String streamId, ByteBuffer buf) { - // Ignore the requests. It reaches here either when a request is received after the - // shuffle file is finalized or when a request is for a duplicate block. - } - - @Override - public void onComplete(String streamId) { - if (isTooLate) { - // Throw an exception here so the block data is drained from channel and server - // responds RpcFailure to the client. - throw new RuntimeException(String.format("Block %s %s", streamId, - ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)); - } - // For duplicate block that is received before the shuffle merge finalizes, the - // server should respond success to the client. - } - - @Override - public void onFailure(String streamId, Throwable cause) { - } - }; + if (isTooLate) { + // Throw RuntimeException in client as of block arrives too late + return createCallbackForInvalidPushBlocks( + streamId, ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX); + } else { + // Duplicate block received, no error message and exception will be thrown in client. + return createCallbackForInvalidPushBlocks(streamId, null); + } } } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId); - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); - Map<Integer, AppShufflePartitionInfo> shufflePartitions = partitions.get(appShuffleId); + logger.info("Finalizing shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + Map<Integer, AppShufflePartitionInfo> shufflePartitions = + appShuffleInfo.partitions.get(msg.shuffleId); MergeStatuses mergeStatuses; if (shufflePartitions == null || shufflePartitions.isEmpty()) { mergeStatuses = Review comment: Replace the use of `partitionsIter` below with iterator over `partitionsToFinalize` (`for (AppShufflePartitionInfo partition: partitionsToFinalize)` ) ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -112,34 +114,49 @@ public ShuffleIndexInformation load(File file) throws IOException { this.errorHandler = new ErrorHandler.BlockPushErrorHandler(); } + private AppShuffleInfo validateAndGetAppShuffleInfo(String appId) { + // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart + AppShuffleInfo appShuffleInfo = + Preconditions.checkNotNull(appsShuffleInfo.get(appId), + "application " + appId + " is not registered or NM was restarted."); + return appShuffleInfo; + } + /** * Given the appShuffleId and reduceId that uniquely identifies a given shuffle partition of an * application, retrieves the associated metadata. If not present and the corresponding merged * shuffle does not exist, initializes the metadata. */ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( - AppShuffleId appShuffleId, + String appId, + int shuffleId, int reduceId) { - File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId); - if (!partitions.containsKey(appShuffleId) && dataFile.exists()) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); Review comment: Should not query `validateAndGetAppShuffleInfo` - pass `appShuffleInfo` as param. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -417,24 +476,75 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } - partitions.remove(appShuffleId); - logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); + appShuffleInfo.partitions.remove(msg.shuffleId); + logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " - + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); + + "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), + executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); + } + String shuffleManagerMeta = executorInfo.shuffleManager; + if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { + ObjectMapper mapper = new ObjectMapper(); + MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); + if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> + new AppShuffleInfo( + mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) + )); + } else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference<AppShuffleInfo> originalAppShuffleInfo = new AtomicReference<>(); Review comment: Make this a `ConcurrentHashMap<Integer, AppShuffleInfo>` (key == attemptId) - in theory, `compute` can invoke the closure multiple times - whenever there is an overlap in computation (in practice it should simply be zero or one attempt id getting replaced). ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI */ private File getFile(String appId, String filename) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File targetFile = ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs, + appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename); logger.debug("Get merged file {}", targetFile.getAbsolutePath()); return targetFile; } - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); + private File getMergedShuffleDataFile( + String appId, + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, fileName); } - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); + private File getMergedShuffleIndexFile( + String appId, + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, indexName); } - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); + private File getMergedShuffleMetaFile( + String appId, + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(appId, metaName); } @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + String[] activeLocalDirs = + Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, + "application " + appId + " active local dirs list has not been updated " + + "by any executor registration"); return activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator = - partitions.entrySet().iterator(); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + cleanupShufflePartitionInfo(appShuffleInfo); + appsShuffleInfo.remove(appId); + if (cleanupLocalDirs) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); + directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + } + } + + /** + * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. + */ + private void cleanupShufflePartitionInfo(AppShuffleInfo appShuffleInfo) { + Iterator<Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>>> iterator = + appShuffleInfo.partitions.entrySet().iterator(); while (iterator.hasNext()) { Review comment: ```suggestion for (Map<Integer, AppShufflePartitionInfo> partitionMap : appShuffleInfo.partitions.values()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org