mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r677990832



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -410,17 +493,40 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
           + "with the current attempt id %s stored in shuffle service for 
application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      appShuffleInfo.partitions.remove(msg.shuffleId);
+    Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = null;
+    // Determinate stage can remove metadata of the shuffle Id as part of 
finalizing shuffle
+    // merge
+    if (msg.shuffleMergeId == 0) {
+      shuffleMergePartitions =
+        appShuffleInfo.partitions.remove(msg.shuffleId).shuffleMergePartitions;
+    } else {
+      // For indeterminate stage we will set shuffleMergePartitions of the 
shuffleId to
+      // INDETERMINATE_SHUFFLE_FINALIZED. This is needed so as to reject old 
shuffleMergeId
+      // push requests after the shuffle merge is finalized.
+      if (null == appShuffleInfo.partitions.get(msg.shuffleId) ||
+          INDETERMINATE_SHUFFLE_FINALIZED ==
+            
appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions) {
+        throw new RuntimeException(
+          String.format("Shuffle merge finalize request for shuffle %s with"
+            + " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId,
+            ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+      } else if (msg.shuffleMergeId ==
+          appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergeId) {
+        shuffleMergePartitions =
+          appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions;
+        
appShuffleInfo.partitions.get(msg.shuffleId).markIndeterminateShuffleFinalized();
+      }

Review comment:
       This is not thread safe, you will need to do this atomically.
   
   ```suggestion
         AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
                 new AtomicReference<>(null);
         appShuffleInfo.partitions.compute(msg.shuffleId, (id, value) -> {
           if (null == value || INDETERMINATE_SHUFFLE_FINALIZED == 
value.shuffleMergePartitions) {
             throw new RuntimeException(
                     String.format("Shuffle merge finalize request for shuffle 
%s with"
                                     + " shuffleMergeId %s is %s", 
msg.shuffleId, msg.shuffleMergeId,
                             
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
           } else if (msg.shuffleMergeId == value.shuffleMergeId) {
             shuffleMergePartitionsRef.set(value);
             // replace with marker value.
             // Note: take constructor parameter true/false to specify whether 
it is finalized or not.
             // and make fields of AppShuffleMergePartitionsInfo final - remove 
markIndeterminateShuffleFinalized
             return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
           } else {
             // shuffle seq mismatch - keep existing value, no shuffle merge 
partitions to finalize.
             return value;
           }
         });
         shuffleMergePartitions = shuffleMergePartitionsRef.get();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to