otterc commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r534352437
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -877,7 +886,7 @@ void closeAllFiles() {
}
Review comment:
Yes, but I think truncating the data file twice while finalizing is
unnecessary. Also we can't simply ignore the exceptions while truncating all
these 3 files during close. I think truncation of these files should be
separated from `closeAllFiles`.
This is the change I have made:
```
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws
IOException {
...
while (partitionsIter.hasNext()) {
AppShufflePartitionInfo partition = partitionsIter.next();
synchronized (partition) {
try {
partition.finalizePartition();
bitmaps[idx] = partition.mapTracker;
reduceIds[idx] = partition.reduceId;
sizes[idx++] = partition.getPosition();
} catch (IOException ioe) {
logger.warn("Exception while finalizing shuffle partition {} {}
{}", msg.appId,
msg.shuffleId, partition.reduceId, ioe);
} finally {
partition.closeAllFiles();
// The partition should be removed after the files are written
so that any new stream
// for the same reduce partition will see that the data file
exists.
partitionsIter.remove();
}
}
}
mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds,
sizes);
}
partitions.remove(appShuffleId);
logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
return mergeStatuses;
}
private void finalizePartition() throws IOException {
if (position != lastChunkOffset) {
try {
updateChunkInfo(position, lastMergedMapIndex);
} catch (IOException ioe) {
if (seekFailed) {
// If seek has failed then fail finalize of this partition by
propagating the exception.
throw ioe;
}
// If seek hasn't failed then ignore this exception.
}
}
// Get rid of any partial block data at the end of the file. This
could either
// be due to failure or a request still being processed when the
shuffle
// merge gets finalized.
dataChannel.truncate(lastChunkOffset);
metaFile.setLength(metaFile.getFilePointer());
indexFile.setLength(indexFile.getFilePointer());
}
```
Any exception while truncating data/meta/index file during finalize will
propagate the exception to the caller and that partition will not be considered
merged.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]