zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640999074
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,64 +123,85 @@ public ShuffleIndexInformation load(File file) throws
IOException {
* shuffle does not exist, initializes the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
- AppShuffleId appShuffleId,
+ AppAttemptShuffleId appAttemptShuffleId,
+ AppAttemptPathsInfo appAttemptPathsInfo,
int reduceId) {
- File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
- if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+ File dataFile = getMergedShuffleDataFile(
+ appAttemptPathsInfo, appAttemptShuffleId.appId,
appAttemptShuffleId.shuffleId, reduceId);
+ if (!partitions.containsKey(appAttemptShuffleId) && dataFile.exists()) {
// If this partition is already finalized then the partitions map will
not contain
// the appShuffleId but the data file would exist. In that case the
block is considered late.
return null;
}
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
- return shufflePartitions.computeIfAbsent(reduceId, key -> {
- // It only gets here when the key is not present in the map. This could
either
- // be the first time the merge manager receives a pushed block for a
given application
- // shuffle partition, or after the merged shuffle file is finalized. We
handle these
- // two cases accordingly by checking if the file already exists.
- File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
- File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
- try {
- if (dataFile.exists()) {
- return null;
- } else {
- return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
indexFile, metaFile);
- }
- } catch (IOException e) {
- logger.error(
- "Cannot create merged shuffle partition with data file {}, index
file {}, and "
- + "meta file {}", dataFile.getAbsolutePath(),
- indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
- throw new RuntimeException(
- String.format("Cannot initialize merged shuffle partition for appId
%s shuffleId %s "
- + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId,
reduceId), e);
+ // While processing the application remove, where the shuffle partitions
info for the specific
+ // application will be cleaned up, this method will still be called to
create new partitions
+ // as of receiving the push blocks. To avoid the potential memory leak,
before creating the
+ // empty hashmap for storing the shuffle partitions information in the
partitions hashmap,
+ // we need to make sure that the entry for the specific application must
still exist in
+ // appAttemptsPathInfo hashmap. Otherwise, the push blocks should be
ignored.
Review comment:
Discussed offline. Even though when the context switch happens after
line 150, during applicationRemove, the entry for this key/value pair will be
removed from the partitions hashmap. The shufflePartitions AtomicReference
won't be null and files will be created for merge shuffle. But the newly
created hashmap will be a temporary referenced object in this method only and
it will be cleaned by JVM itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]