mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990751884


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java:
##########
@@ -224,6 +224,12 @@ protected void handleMessage(
       } finally {
         responseDelayContext.stop();
       }
+    } else if (msgObj instanceof RemoveShuffleMerge) {
+      RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+      checkAuth(client, msg.appId);
+      logger.info("Removing shuffle merge data for application {} shuffle {} 
shuffleMerge {}",
+          msg.appId, msg.shuffleId, msg.shuffleMergeId);
+      mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, 
msg.shuffleMergeId);

Review Comment:
   We need to pass in `appAttemptId` as well for `RemoveShuffleMerge` - I had 
left that comment earlier.
   Take a look at `FinalizeShuffleMerge`, its processing and handle it 
similarly ? (we can pass `RemoveShuffleMerge` to 
`mergeManager.removeShuffleMerge` - and lookup from the fields there).



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);

Review Comment:
   Add validation for `attemptId` here.
   Take a look at `finalizeShuffleMerge` for example



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   We should not remove it directly - the value within the map could be for a 
different `shuffleMergeId` (newer for example).
   Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
   
   Rough sketch is:
   
   ```
     public void removeShuffleMerge(FinalizeShuffleMerge msg) {
       AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
       if (appShuffleInfo.attemptId != msg.appAttemptId) {
         // incoming request for older app attempt - exception
         throw new IllegalArgumentException("appropriate msg for invalid app 
attempt");
       }
   
       appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
   
         if (null != mergePartitionsInfo) {
           // where DELETE_CURRENT == -1
           // merge id will be set to -1 when we are cleaning up shuffle, and 
there is no chance of its reuse -
           // else it will be set to an explicit value.
           boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
   
           // Looks like there is a bug in finalizeShuffleMerge, let us fix it 
here anyway
           // and handle it for finalizeShuffleMerge in a different PR.
           AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new 
AppAttemptShuffleMergeId(
                   msg.appId, msg.appAttemptId, 
                   msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
           if (!deleteAny && msg.shuffleMergeId < 
mergePartitionsInfo.shuffleMergeId) {
   
             // throw exception - request for an older shuffle merge id
             throw new RuntimeException("appropriate msg for delete of old 
merge id");
           } else if (!deleteAny && msg.shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId) {
   
             // cleanup request for newer shuffle - remove the outdated data we 
have.
             submitCleanupTask(() ->
                     closeAndDeleteOutdatedPartitions(
                             currentAppAttemptShuffleMergeId,
                             mergePartitionsInfo.shuffleMergePartitions));
           } else {
   
             // request to cleanup shuffle we are currently hosting
   
             // Not yet finalized - use the existing cleanup mechanism
             if (!mergePartitionsInfo.isFinalized()) {
               submitCleanupTask(() ->
                       closeAndDeleteOutdatedPartitions(
                               currentAppAttemptShuffleMergeId,
                               mergePartitionsInfo.shuffleMergePartitions));
             } else {
   
               if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
                 // Introduce new method which deletes the files for 
shuffleMergeId
                 submitCleanupTask(() ->
                       deleteMergedFiles(msg.appId, msg.appAttemptId, 
                                     msg.shuffleId, msg.shuffleMergeId, 
                                     // To be introduced - see below
                                     mergePartitionsInfo.getReduceIds()));
   
               }
               // simply return existing entry immediately - db does not need 
updating - we can actually 
               // drop reduce-ids here as an optimization
               return mergePartitionsInfo;
             }
           }
         }
         // keep track of the latest merge id - and mark it as finalized and 
immutable as already marked for deletion/deleted.
         AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
           msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
         writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
         // no reduceid's
         return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
       });
     }
   ```
   
   To write up `deleteMergedFiles`, the only thing missing is set of valid 
reduce id's (`getReduceIds` above).
   We can keep track of that by modifying `finalizeShuffleMerge` as follows:
   
   a) keep reference to response from `appShuffleInfo.shuffles.compute()`
   b) Before returning `mergeStatuses`, update this variable with `reduceIds`
   c) for efficiency, we can convert it to a bitmap and save space - but that 
is an impl detail.
   
   Thoughts ?
   
   +CC @otterc, @zhouyejoe - please take a look at the bug there should be 
there in `finalizeShuffleMerge`, which I have sketched fix for above.
   Unless I am missing something, we should fix `finalizeShuffleMerge` in a 
seperate PR.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
                 "finalizing shuffle partition {}", msg.appId, 
msg.appAttemptId, msg.shuffleId,
                 msg.shuffleMergeId, partition.reduceId);
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+            partition.closeAllFilesAndDeleteIfNeeded(deleteFile);

Review Comment:
   Revert ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+              mergePartitionsInfo.shuffleMergePartitions;
           submitCleanupTask(() ->
-              closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+              closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
         } else {

Review Comment:
   revert this ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+              mergePartitionsInfo.shuffleMergePartitions;
           submitCleanupTask(() ->
-              closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+              closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
           //  2. finalization of indeterminate stage if the shuffleMergeId 
related to it is the one
           //  for which the message is received.
           
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
+      } else {
+        mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, 
true);
       }
+      mergePartitionsInfo.setFinalized(true);
       // Update the DB for the finalized shuffle
       writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
       // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
       // sent to the driver will be empty. This can happen when the service 
didn't receive any
       // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
       // shuffle.
-      return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      return mergePartitionsInfo;

Review Comment:
   revert changes to this method ?



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2543,16 +2541,13 @@ private[spark] class DAGScheduler(
       shuffleIdToMapStage.filter { case (_, stage) =>
         stage.shuffleDep.shuffleMergeAllowed && 
stage.shuffleDep.getMergerLocs.isEmpty &&
           runningStages.contains(stage)
-      }.foreach { case(_, stage: ShuffleMapStage) =>
-          if (getAndSetShufflePushMergerLocations(stage).nonEmpty) {
-            logInfo(s"Shuffle merge enabled adaptively for $stage with 
shuffle" +
-              s" ${stage.shuffleDep.shuffleId} and shuffle merge" +
-              s" ${stage.shuffleDep.shuffleMergeId} with 
${stage.shuffleDep.getMergerLocs.size}" +
-              s" merger locations")
-            
mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
-              stage.shuffleDep.getMergerLocs)
-          }
-        }
+      }.foreach { case (_, stage: ShuffleMapStage) =>
+        configureShufflePushMergerLocations(stage)
+        logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" +

Review Comment:
   Surround the `logInfo` with `if (stage.shuffleDep.getMergerLocs.nonEmpty)` - 
else we will print that log line even if mergers were not set.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1639,9 +1661,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
       try {
         if (dataChannel.isOpen()) {
           dataChannel.close();
-          if (delete) {
-            dataFile.delete();
-          }
+        }
+        if (delete) {
+          dataFile.delete();

Review Comment:
   This looks like an orthogonal bug fix.
   +CC @otterc 



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##########
@@ -256,6 +256,20 @@ public void onFailure(Throwable e) {
     }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);

Review Comment:
   `send` should be fine here - there is no response from ESS to driver when 
sending `RemoveShuffleMerge`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,26 +1431,27 @@ public String toString() {
    * required for the shuffles of indeterminate stages.
    */
   public static class AppShuffleMergePartitionsInfo {

Review Comment:
   Revert changes to this class ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to