venkata91 commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r663595592
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2225,4 +2225,14 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)
+
+ private[spark] val APP_ATTEMPT_ID =
+ ConfigBuilder("spark.app.attempt.id")
Review comment:
nit: This feels like it should be `spark.app.attemptNumber` than
`spark.app.attempt.id`. Also currently this is specific to push based shuffle
right? Should we add that in the documentation?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,76 +236,54 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
}
}
- /**
- * The logic here is consistent with
- * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
- * org.apache.spark.storage.BlockId, scala.Option)]]
- */
- private File getFile(String appId, String filename) {
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
- "application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
- logger.debug("Get merged file {}", targetFile.getAbsolutePath());
- return targetFile;
- }
-
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
- }
-
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
- }
-
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
- }
-
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
- "application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
- "application " + appId
- + " active local dirs list has not been updated by any executor
registration");
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ String[] activeLocalDirs =
+ Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+ "application " + appId + " active local dirs list has not been updated
" +
+ "by any executor registration");
return activeLocalDirs;
}
@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
logger.info("Application {} removed, cleanupLocalDirs = {}", appId,
cleanupLocalDirs);
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.remove(appId),
- "application " + appId + " is not registered or NM was restarted.");
- Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>>
iterator =
- partitions.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry =
iterator.next();
- AppShuffleId appShuffleId = entry.getKey();
- if (appId.equals(appShuffleId.appId)) {
- iterator.remove();
- for (AppShufflePartitionInfo partitionInfo :
entry.getValue().values()) {
+ AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+ if (null != appShuffleInfo) {
+ mergedShuffleCleanerExecutor.execute(
+ () -> cleanupMergedShuffle(appShuffleInfo, cleanupLocalDirs));
+ }
+ }
+
+
+ /**
+ * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+ * If cleanupLocalDirs is true, the merged shuffle files will also be
deleted.
+ * The cleanup will be executed in a separate thread.
+ */
+ private void cleanupMergedShuffle(
Review comment:
nit: `closeAndDeletePartitionFilesIfNeeded`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]