mridulm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r638402695
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
Review comment:
Pass the attempt id for the request along here - and validate that
`AppAttemptPathsInfo` fetched from `appsPathsInfo` is for the right attempt.
Else we can end up in race between initial validation and `getFile`.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +429,65 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ partitions.remove(appAttemptShuffleId);
+ logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId,
msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo,
MergeDirectoryMeta.class);
+ if (mergeDirectoryMeta.attemptId == -1) {
+ // When attemptId is 0, there is no attemptId stored in the
ExecutorShuffleInfo.
+ // Only the first ExecutorRegister message can register the merge
dirs
+ appsPathsInfo.computeIfAbsent(appId, id ->
+ new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId,
executorInfo.localDirs,
+ mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+ } else {
+ // If attemptId is not -1, there is attemptId stored in the
ExecutorShuffleInfo.
+ // The first ExecutorRegister message from the same application
attempt will
+ // register the merge dirs in Shuffle Service. Any later
ExecutorRegister message
+ // from the same application attempt will not override the merge
dirs. But it can
+ // be overridden by ExecutorRegister message from newer application
attempt.
+ // Former attempt's shuffle partitions information will also be
cleaned up.
+ boolean newAttemptRegistered = false;
+ if (appsPathsInfo.containsKey(appId)
+ && mergeDirectoryMeta.attemptId >
appsPathsInfo.get(appId).attemptId) {
+ newAttemptRegistered = true;
+ }
+ appsPathsInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+ if (appAttemptPathsInfo == null
+ || (appAttemptPathsInfo != null &&
mergeDirectoryMeta.attemptId > appAttemptPathsInfo.attemptId)) {
+ appAttemptPathsInfo = new AppAttemptPathsInfo(appId,
mergeDirectoryMeta.attemptId,
+ executorInfo.localDirs, mergeDirectoryMeta.mergeDir,
executorInfo.subDirsPerLocalDir);
+ }
+ return appAttemptPathsInfo;
+ });
+ // It is safe to clean up the AppShufflePartitionInfo
+ if (newAttemptRegistered) {
Review comment:
We should not rely on `newAttemptRegistered` here - but check if
`compute` actually added an entry.
For example, something like this:
```suggestion
AtomicBoolean newAttemptRegistered = new AtomicBoolean(false);
appsPathsInfo.compute(appId, (id, appAttemptPathsInfo) -> {
if (appAttemptPathsInfo == null
|| (appAttemptPathsInfo != null &&
mergeDirectoryMeta.attemptId > appAttemptPathsInfo.attemptId)) {
appAttemptPathsInfo = new AppAttemptPathsInfo(appId,
mergeDirectoryMeta.attemptId,
executorInfo.localDirs, mergeDirectoryMeta.mergeDir,
executorInfo.subDirsPerLocalDir);
newAttemptRegistered.set(true);
}
return appAttemptPathsInfo;
});
// It is safe to clean up the AppShufflePartitionInfo
if (newAttemptRegistered.get()) {
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
Review comment:
Add a constant for `-1` (and probably change from `-1` given we have
unknown attempt id as `-1` already ?) here to describe what behavior is
expected (remove all attempts for application).
##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -644,15 +644,15 @@ class UtilsSuite extends SparkFunSuite with
ResetSystemProperties with Logging {
val tempDir2 = Utils.createTempDir()
val sourceFile1 = new File(tempDir2, "foo.txt")
- Files.touch(sourceFile1)
+ com.google.common.io.Files.touch(sourceFile1)
Review comment:
Can we revert these changes ? Does not look related to this PR
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
Review comment:
Also, it should be `appAttemptShuffleId.attemptId < attemptId` ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -45,18 +45,21 @@
private final TransportClient client;
private final String appId;
+ private final int attemptId;
private final String[] blockIds;
private final BlockFetchingListener listener;
private final Map<String, ManagedBuffer> buffers;
Review comment:
Review note: while a backwardly incompatible change between 3.1 and 3.2
- current codepaths in ESS 3.1 cant/dont rely on it, so this change should be
fine.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
/**
* The logic here is consistent with
- * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+ * @see
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
+ File targetFile =
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+ appAttemptPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ private File getMergedShuffleDataFile(String appId, int shuffleId, int
reduceId) {
+ String fileName = String.format("%s.data", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, fileName);
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ private File getMergedShuffleIndexFile(String appId, int shuffleId, int
reduceId) {
+ String indexName = String.format("%s.index", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, indexName);
}
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ private File getMergedShuffleMetaFile(String appId, int shuffleId, int
reduceId) {
+ String metaName = String.format("%s.meta", generateFileName(appId,
shuffleId, reduceId));
+ return getFile(appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+ String[] activeLocalDirs =
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor
registration");
return activeLocalDirs;
}
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific application attempt.
+ * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo
from
+ * all the attempts. Otherwise, only the AppShufflePartitionInfo from the
specific
+ * application attempt will be cleaned up.
+ */
+ private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+ Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
+ Map.Entry<AppAttemptShuffleId, Map<Integer,
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+ AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+ if (appId.equals(appAttemptShuffleId.appId)
+ && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
iterator.remove();
- for (AppShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
+ for (AppAttemptShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
partitionInfo.closeAllFiles();
}
}
}
+ }
+
+ @Override
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
+ // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
+ AppAttemptPathsInfo appAttemptPathsInfo =
Preconditions.checkNotNull(appsPathsInfo.remove(appId),
+ "application " + appId + " is not registered or NM was restarted.");
+ cleanupShufflePartitionInfo(appId, -1);
Review comment:
There is a race between this cleanup and some executor adding an entry
into partitions map.
In a nutshell, we need to maintain an `applications` `Set` which contains
currently running apps.
a) In `registerExecutor`, add to this `Set`
b) In `applicationRemoved`, remove from this `Set` before doing
`cleanupShufflePartitionInfo`.
c) In `getOrCreateAppShufflePartitionInfo`, in `partitions.computeIfAbsent`,
check if application is in this `Set` before returning a valid `Map` - else
return `null` (and handle a `null` value for `shufflePartitions`).
This handles the race condition of `getOrCreateAppShufflePartitionInfo`
(from a remote executor) overlapping with `applicationRemoved` (on local NM).
This also means the race condition @otterc referenced
[above](https://github.com/apache/spark/pull/32007/files#r638325094) will not
occur (application removal will ensure all records are cleaned up - so stale
entries might live utmost until application termination - not after).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]