zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r648743781
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -45,18 +45,21 @@
private final TransportClient client;
private final String appId;
+ private final int attemptId;
private final String[] blockIds;
private final BlockFetchingListener listener;
private final Map<String, ManagedBuffer> buffers;
Review comment:
Will resolve the multi-attempts in SPARK-35546
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
iterator.remove();
- for (AppShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
+ for (AppAttemptShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
partitionInfo.closeAllFiles();
}
}
}
+ }
+
+ @Override
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
+ // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.remove(appId),
+ "application " + appId + " is not registered or NM was restarted.");
+ cleanupShufflePartitionInfo(appId, -1);
Review comment:
Will resolve the multi-attempts in SPARK-35546
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]