zhouyejoe commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r671536035
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,76 +244,55 @@ public ManagedBuffer getMergedBlockData(String appId, int
shuffleId, int reduceI
}
}
- /**
- * The logic here is consistent with
- * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
- * org.apache.spark.storage.BlockId, scala.Option)]]
- */
- private File getFile(String appId, String filename) {
- // TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
- "application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
- appPathsInfo.subDirsPerLocalDir, filename);
- logger.debug("Get merged file {}", targetFile.getAbsolutePath());
- return targetFile;
- }
-
- private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
- String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
- }
-
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
- String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
- }
-
- private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
- String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
- }
-
@Override
public String[] getMergedBlockDirs(String appId) {
- AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
- "application " + appId + " is not registered or NM was restarted.");
- String[] activeLocalDirs =
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
- "application " + appId
- + " active local dirs list has not been updated by any executor
registration");
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ String[] activeLocalDirs =
+ Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
Review comment:
I think we can safely remove this check. Internally we create the
`AppPathsInfo` in `registerApplication`, then update this `activeLocalDirs`
during `executorRegistration`. It can be null in this case. In upstream, we
created the `AppPathsInfo` during `executorRegistration`. This won't be null in
any cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]