mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990756966
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,26 +1431,27 @@ public String toString() {
* required for the shuffles of indeterminate stages.
*/
public static class AppShuffleMergePartitionsInfo {
Review Comment:
Revert changes to this class ? (assuming the changes I sketched above are
fine)
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
} else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+ mergePartitionsInfo.shuffleMergePartitions;
submitCleanupTask(() ->
- closeAndDeleteOutdatedPartitions(
- appAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+ closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId,
shuffleMergePartitions));
} else {
// This block covers:
// 1. finalization of determinate stage
// 2. finalization of indeterminate stage if the shuffleMergeId
related to it is the one
// for which the message is received.
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
}
+ } else {
+ mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId,
true);
}
+ mergePartitionsInfo.setFinalized(true);
// Update the DB for the finalized shuffle
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
// Even when the mergePartitionsInfo is null, we mark the shuffle as
finalized but the results
// sent to the driver will be empty. This can happen when the service
didn't receive any
// blocks for the shuffle yet and the driver didn't wait for enough time
to finalize the
// shuffle.
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ return mergePartitionsInfo;
Review Comment:
revert changes to this method ? (assuming the changes I sketched above are
fine)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]