mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r677990832
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -410,17 +493,40 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
+ "with the current attempt id %s stored in shuffle service for
application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- appShuffleInfo.partitions.remove(msg.shuffleId);
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = null;
+ // Determinate stage can remove metadata of the shuffle Id as part of
finalizing shuffle
+ // merge
+ if (msg.shuffleMergeId == 0) {
+ shuffleMergePartitions =
+ appShuffleInfo.partitions.remove(msg.shuffleId).shuffleMergePartitions;
+ } else {
+ // For indeterminate stage we will set shuffleMergePartitions of the
shuffleId to
+ // INDETERMINATE_SHUFFLE_FINALIZED. This is needed so as to reject old
shuffleMergeId
+ // push requests after the shuffle merge is finalized.
+ if (null == appShuffleInfo.partitions.get(msg.shuffleId) ||
+ INDETERMINATE_SHUFFLE_FINALIZED ==
+
appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions) {
+ throw new RuntimeException(
+ String.format("Shuffle merge finalize request for shuffle %s with"
+ + " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId,
+ ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+ } else if (msg.shuffleMergeId ==
+ appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergeId) {
+ shuffleMergePartitions =
+ appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions;
+
appShuffleInfo.partitions.get(msg.shuffleId).markIndeterminateShuffleFinalized();
+ }
Review comment:
This is not thread safe, you will need to do this atomically.
```suggestion
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
appShuffleInfo.partitions.compute(msg.shuffleId, (id, value) -> {
if (null == value || INDETERMINATE_SHUFFLE_FINALIZED ==
value.shuffleMergePartitions) {
throw new RuntimeException(
String.format("Shuffle merge finalize request for shuffle
%s with"
+ " shuffleMergeId %s is %s",
msg.shuffleId, msg.shuffleMergeId,
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
} else if (msg.shuffleMergeId == value.shuffleMergeId) {
shuffleMergePartitionsRef.set(value);
// replace with marker value.
// Note: take constructor parameter true/false to specify whether
it is finalized or not.
// and make fields of AppShuffleMergePartitionsInfo final - remove
markIndeterminateShuffleFinalized
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
} else {
// shuffle seq mismatch - keep existing value, no shuffle merge
partitions to finalize.
return value;
}
});
shuffleMergePartitions = shuffleMergePartitionsRef.get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]