mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r676099348



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -135,51 +150,87 @@ protected AppShuffleInfo 
validateAndGetAppShuffleInfo(String appId) {
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
+      int shuffleMergeId,
       int reduceId) {
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
-    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+    ConcurrentMap<Integer, Map<Integer, Map<Integer, 
AppShufflePartitionInfo>>> partitions =
       appShuffleInfo.partitions;
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.compute(shuffleId, (id, map) -> {
-        if (map == null) {
-          // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId but the data file would exist. In that case the block 
is considered late.
-          if (dataFile.exists()) {
+    Map<Integer, Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitions 
=
+      partitions.compute(shuffleId, (id, shuffleMergePartitionsMap) -> {
+        if (shuffleMergePartitionsMap == null) {
+          logger.info("Creating a new attempt for shuffle blocks push request 
for"
+              + " shuffle {} with shuffleMergeId {} for application {}_{}", 
shuffleId,
+              shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId));
+          Map<Integer, Map<Integer, AppShufflePartitionInfo>> 
newShuffleMergePartitions
+            = new ConcurrentHashMap<>();
+          Map<Integer, AppShufflePartitionInfo> newPartitionsMap = new 
ConcurrentHashMap<>();
+          newShuffleMergePartitions.put(shuffleMergeId, newPartitionsMap);
+          return newShuffleMergePartitions;
+        } else if (shuffleMergePartitionsMap.containsKey(shuffleMergeId)) {
+          return shuffleMergePartitionsMap;
+        } else {
+          int latestShuffleMergeId = 
shuffleMergePartitionsMap.keySet().stream()
+            .mapToInt(v -> v).max().orElse(UNDEFINED_SHUFFLE_MERGE_ID);
+          if (latestShuffleMergeId > shuffleMergeId) {
+            logger.info("Rejecting shuffle blocks push request for shuffle {} 
with"
+                + " shuffleMergeId {} for application {}_{} as a higher 
shuffleMergeId"
+                + " {} request is already seen", shuffleId, shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));
+            // Reject the request as we have already seen a higher 
shuffleMergeId than the
+            // current incoming one
             return null;
+          } else {
+            // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
+            // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
+            // happens in the non-deterministic stage retries
+            logger.info("Creating a new attempt for shuffle blocks push 
request for"
+               + " shuffle {} with shuffleMergeId {} for application {}_{} 
since it is"
+               + " higher than the latest shuffleMergeId {} already seen", 
shuffleId,
+               shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId,
+               latestShuffleMergeId));
+            if (null != shuffleMergePartitionsMap.get(latestShuffleMergeId)) {
+              Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+                shuffleMergePartitionsMap.get(latestShuffleMergeId);
+              mergedShuffleCleaner.execute(() ->
+                closeAndDeletePartitionFiles(shufflePartitions));
+            }
+            shuffleMergePartitionsMap.put(latestShuffleMergeId, 
STALE_SHUFFLE_PARTITIONS);

Review comment:
       Why are we keeping all previous entries ? Just the latest 
(shuffleMergeId in this case) should do ?
   
   Also, assuming current code is correct:
   ```suggestion
               Map<Integer, AppShufflePartitionInfo> latestShufflePartitions = 
shuffleMergePartitionsMap.get(latestShuffleMergeId);
               // latestShuffleMergeId cannot be UNDEFINED_SHUFFLE_MERGE_ID - 
since shuffle merge id is specific to a shuffle id.
               assert (UNDEFINED_SHUFFLE_MERGE_ID != latestShuffleMergeId);
               assert (null != latestShufflePartitions);
               
               if (STALE_SHUFFLE_PARTITIONS != latestShufflePartitions) {
                 mergedShuffleCleaner.execute(() ->
                   closeAndDeletePartitionFiles(latestShufflePartitions));
                 shuffleMergePartitionsMap.put(latestShuffleMergeId, 
STALE_SHUFFLE_PARTITIONS);
               }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to