zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640349087
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
iterator.remove();
- for (AppShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
+ for (AppAttemptShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
partitionInfo.closeAllFiles();
}
}
}
+ }
+
+ @Override
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
+ // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.remove(appId),
+ "application " + appId + " is not registered or NM was restarted.");
+ cleanupShufflePartitionInfo(appId, -1);
Review comment:
@mridulm FYI, I have not added the unit test for this memory leak yet in
this update PR. Post the majority of the required change first and I will add
it tomorrow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]