mridulm commented on code in PR #37624:
URL: https://github.com/apache/spark/pull/37624#discussion_r964556864
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -795,10 +801,27 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
/**
- * Close the DB during shutdown
+ * Shutdown mergedShuffleCleaner and close the DB during shutdown
*/
@Override
public void close() {
+ if (!mergedShuffleCleaner.isShutdown()) {
+ // SPARK-40186:Use two phases shutdown refer to
+ //
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
+ // Use two phases shutdown can prevent new tasks and wait for executing
tasks to
+ // complete gracefully, and once timeout is reached, we want to
interrupt running tasks,
+ // so that they fail. This is to prevent updates to shuffle state db
after it is closed.
+ try {
+ mergedShuffleCleaner.shutdown();
+ // Wait a while for existing tasks to terminate
+ if (!mergedShuffleCleaner.awaitTermination(cleanerShutdownTimeout,
TimeUnit.SECONDS)) {
+ shutdownMergedShuffleCleanerNow();
+ }
+ } catch (InterruptedException ignored) {
+ shutdownMergedShuffleCleanerNow();
+ Thread.currentThread().interrupt();
Review Comment:
@Ngone51 Do you mean in this specific case why we are preserving ? Or in
general ?
I cant think of a good reason to do it for this specific case (other than
the fact that it is idiomatic for this sort of usage :) ) since we are already
in NM shutdown at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]