mridulm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r638422058
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
iterator.remove();
- for (AppShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
+ for (AppAttemptShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
partitionInfo.closeAllFiles();
}
}
}
+ }
+
+ @Override
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
+ // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.remove(appId),
+ "application " + appId + " is not registered or NM was restarted.");
+ cleanupShufflePartitionInfo(appId, -1);
Review comment:
There is a race between this cleanup and some executor adding an entry
into partitions map.
In a nutshell, we need to maintain an `applications` `Set` which contains
currently running apps - and all updates/checks on this Set should be via
`synchronized` block.
a) In `registerExecutor`, add to this `Set`
b) In `applicationRemoved`, remove from this `Set` before doing
`cleanupShufflePartitionInfo`.
c) In `getOrCreateAppShufflePartitionInfo`, in `partitions.computeIfAbsent`,
lock `applications` and check if incoming request application is in this
`Set`, and if missing return `null` (and handle a `null` value for
`shufflePartitions`).
This handles the race condition of `getOrCreateAppShufflePartitionInfo`
(from a remote executor) overlapping with `applicationRemoved` (on local NM).
This also means the race condition @otterc referenced
[above](https://github.com/apache/spark/pull/32007/files#r638325094) will not
occur (application removal will ensure all records are cleaned up - so stale
entries might live utmost until application termination - not after).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]