mridulm commented on a change in pull request #35325:
URL: https://github.com/apache/spark/pull/35325#discussion_r793202348



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
       new AtomicReference<>(null);
-    // Metadata of the determinate stage shuffle can be safely removed as part 
of finalizing
-    // shuffle merge. Currently once the shuffle is finalized for a 
determinate stages, retry
-    // stages of the same shuffle will have shuffle push disabled.
-    if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
-      AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
-        appShuffleInfo.shuffles.remove(msg.shuffleId);
-      if (appShuffleMergePartitionsInfo != null) {
-        
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (null == mergePartitionsInfo) {
+        //If the mergePartitions was never created then it means that there 
weren't any push
+        //blocks that were ever received for this shuffle. This could be the 
case when the driver
+        //doesn't wait for enough time to start the stage which reads this 
shuffle data.

Review comment:
       nit: space between `//` and text.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
       new AtomicReference<>(null);
-    // Metadata of the determinate stage shuffle can be safely removed as part 
of finalizing
-    // shuffle merge. Currently once the shuffle is finalized for a 
determinate stages, retry
-    // stages of the same shuffle will have shuffle push disabled.
-    if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
-      AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
-        appShuffleInfo.shuffles.remove(msg.shuffleId);
-      if (appShuffleMergePartitionsInfo != null) {
-        
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (null == mergePartitionsInfo) {
+        //If the mergePartitions was never created then it means that there 
weren't any push
+        //blocks that were ever received for this shuffle. This could be the 
case when the driver
+        //doesn't wait for enough time to start the stage which reads this 
shuffle data.
+        return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      } else if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
+          || mergePartitionsInfo.isFinalized()) {
+        throw new RuntimeException(
+            String.format("Shuffle merge finalize request for shuffle %s with" 
+
+                " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId,
+                
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+      } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+        // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
+        // empty MergeStatuses but cleanup the older shuffleMergeId files.
+        mergedShuffleCleaner.execute(() ->
+            
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
+        return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      } else {
+        // This block covers:
+        //  1. finalization of determinate stage
+        //  2. finalization of indeterminate stage if the shuffleMergeId 
related to it is the one
+        //  for which the message is received.
+        
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
+        return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);

Review comment:
       nit: All paths in this lambda, which are not throwing exceptions, are 
doing the same thing - `new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, 
true);`
   Make that common ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -576,14 +555,25 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
         synchronized (partition) {
           try {
+            logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing 
shuffle partition "
+                + "{} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+                msg.shuffleMergeId, partition.reduceId);
             // This can throw IOException which will marks this shuffle 
partition as not merged.
             partition.finalizePartition();
-            bitmaps.add(partition.mapTracker);
-            reduceIds.add(partition.reduceId);
-            sizes.add(partition.getLastChunkOffset());
+            if (partition.mapTracker.getCardinality() > 0) {
+              bitmaps.add(partition.mapTracker);
+              reduceIds.add(partition.reduceId);
+              sizes.add(partition.getLastChunkOffset());
+              logger.debug("{} attempt {} shuffle {} shuffleMerge {}: 
finalization results added"
+              + " for partition {} data size {} index size {} meta size {}",

Review comment:
       nit: move the `+` to prev line (here and elsewhere in string concat for 
log msgs)

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
       new AtomicReference<>(null);
-    // Metadata of the determinate stage shuffle can be safely removed as part 
of finalizing
-    // shuffle merge. Currently once the shuffle is finalized for a 
determinate stages, retry
-    // stages of the same shuffle will have shuffle push disabled.
-    if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
-      AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
-        appShuffleInfo.shuffles.remove(msg.shuffleId);
-      if (appShuffleMergePartitionsInfo != null) {
-        
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (null == mergePartitionsInfo) {
+        //If the mergePartitions was never created then it means that there 
weren't any push
+        //blocks that were ever received for this shuffle. This could be the 
case when the driver
+        //doesn't wait for enough time to start the stage which reads this 
shuffle data.
+        return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      } else if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
+          || mergePartitionsInfo.isFinalized()) {

Review comment:
       nit: move `||` to prev line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to