holdenk commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2875357640


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -279,18 +301,20 @@ class ChecksumCheckpointFileManager(
   }
 
   override def close(): Unit = {
-    threadPool.shutdown()
-    // Wait a bit for it to finish up in case there is any ongoing work
-    // Can consider making this timeout configurable, if needed
-    val timeoutMs = 500
-    if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
-      logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, 
timeoutMs)} ms," +
-        log" forcing shutdown")
-      threadPool.shutdownNow() // stop the executing tasks
-
-      // Wait a bit for the threads to respond
-      if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
-        logError(log"Thread pool did not terminate")
+    threadPool.foreach { pool =>
+      pool.shutdown()

Review Comment:
   Maybe silly but would it make sense to do this first foreach and then the 
subsequent blocking checkin? It's only during shutdown so not a big deal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to