mridulm commented on code in PR #37624:
URL: https://github.com/apache/spark/pull/37624#discussion_r953395486


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -795,13 +796,34 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
   }
 
   /**
-   * Close the DB during shutdown
+   * Shutdown mergedShuffleCleaner and close the DB during shutdown
    */
   @Override
   public void close() {
+    if (!mergedShuffleCleaner.isShutdown()) {
+      // Use two phases shutdown refer to
+      // 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
+      try {
+        mergedShuffleCleaner.shutdown();
+        // Wait a while for existing tasks to terminate
+        if (!mergedShuffleCleaner.awaitTermination(10L, TimeUnit.SECONDS)) {
+          mergedShuffleCleaner.shutdownNow();
+          // Wait a while for tasks to respond to being cancelled
+          if (!mergedShuffleCleaner.awaitTermination(10L, TimeUnit.SECONDS)) {
+            logger.warn("mergedShuffleCleaner did not terminate");
+          }
+        }

Review Comment:
   We typically do not do both in spark.
   Please add a note that we are doing both here because:
   a) We want to prevent new tasks and wait for executing tasks to complete 
gracefully.
   b) Once timeout is reached, we want to interrupt running tasks, so that they 
fail.
   
   This is to prevent updates to rocks db after it is closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to