mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r676098927



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -135,51 +150,87 @@ protected AppShuffleInfo 
validateAndGetAppShuffleInfo(String appId) {
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
+      int shuffleMergeId,
       int reduceId) {
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
-    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+    ConcurrentMap<Integer, Map<Integer, Map<Integer, 
AppShufflePartitionInfo>>> partitions =
       appShuffleInfo.partitions;
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.compute(shuffleId, (id, map) -> {
-        if (map == null) {
-          // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId but the data file would exist. In that case the block 
is considered late.
-          if (dataFile.exists()) {
+    Map<Integer, Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitions 
=
+      partitions.compute(shuffleId, (id, shuffleMergePartitionsMap) -> {
+        if (shuffleMergePartitionsMap == null) {
+          logger.info("Creating a new attempt for shuffle blocks push request 
for"
+              + " shuffle {} with shuffleMergeId {} for application {}_{}", 
shuffleId,
+              shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId));
+          Map<Integer, Map<Integer, AppShufflePartitionInfo>> 
newShuffleMergePartitions
+            = new ConcurrentHashMap<>();
+          Map<Integer, AppShufflePartitionInfo> newPartitionsMap = new 
ConcurrentHashMap<>();
+          newShuffleMergePartitions.put(shuffleMergeId, newPartitionsMap);
+          return newShuffleMergePartitions;
+        } else if (shuffleMergePartitionsMap.containsKey(shuffleMergeId)) {
+          return shuffleMergePartitionsMap;
+        } else {
+          int latestShuffleMergeId = 
shuffleMergePartitionsMap.keySet().stream()
+            .mapToInt(v -> v).max().orElse(UNDEFINED_SHUFFLE_MERGE_ID);
+          if (latestShuffleMergeId > shuffleMergeId) {
+            logger.info("Rejecting shuffle blocks push request for shuffle {} 
with"
+                + " shuffleMergeId {} for application {}_{} as a higher 
shuffleMergeId"
+                + " {} request is already seen", shuffleId, shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));
+            // Reject the request as we have already seen a higher 
shuffleMergeId than the
+            // current incoming one
             return null;

Review comment:
       This will end up removing `shuffleId` from `partitions` map.
   Please look up semantics of `compute` method for more details. 
   
   I would suggest throwing an Exception and handling it that when we have to 
reject stale pushes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to