HeartSaVioR commented on code in PR #42066:
URL: https://github.com/apache/spark/pull/42066#discussion_r1280065414


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1852,6 +1852,17 @@ object SQLConf {
       .createWithDefault(
         
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
 
+  val NUM_STATE_STORE_MAINTENANCE_THREADS =
+    buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
+      .internal()
+      .doc("Number of threads in the thread pool that perform clean up and 
snapshotting tasks " +
+        "for stateful streaming queries. The default value is 2 so that this 
thread pool " +

Review Comment:
   nit: the explanation of default value is no longer true. need to update.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -587,6 +622,8 @@ object StateStore extends Logging {
   /** Stop maintenance thread and reset the maintenance task */
   def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
     if (maintenanceTask != null) {
+      maintenanceThreadPool.stop()

Review Comment:
   1. Let's cleanup relevant variables as well, `threadPoolException`, and 
`maintenancePartitions`. It should be necessary to re-initialize 
maintenanceTask.
   2. For the full safety, I'd have two if statements, to handle the cleanup of 
`maintenanceThreadPool` and `maintenanceTask` separately.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -628,17 +687,44 @@ object StateStore extends Logging {
     if (SparkEnv.get == null) {
       throw new IllegalStateException("SparkEnv not active, cannot do 
maintenance on StateStores")
     }
-    loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, 
provider) =>
-      try {
-        provider.doMaintenance()
-        if (!verifyIfStoreInstanceActive(id)) {
-          unload(id)
-          logInfo(s"Unloaded $provider")
-        }
-      } catch {
-        case NonFatal(e) =>
-          logWarning(s"Error managing $provider, stopping management thread")
-          throw e
+    loadedProviders.synchronized {
+      loadedProviders.toSeq
+    }.foreach { case (id, provider) =>
+      // check exception
+      if (threadPoolException.get() != null) {

Review Comment:
   First of all, this is not thread-safe although we use AtomicReference, as we 
are dealing with two different operations.
   Second, if we reset the exception here, other partition(s) will likely still 
schedule the maintenance task, which may not be something we desire to. We may 
want to skip scheduling others once the error was found.
   Third, exception could be "null" (race condition can happen - imagine two 
threads executing L695 concurrently) and it could lead to other issue on 
throwing null.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -602,22 +639,44 @@ object StateStore extends Logging {
   }
 
   /** Start the periodic maintenance task if not already started and if Spark 
active */
-  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
+    val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
     loadedProviders.synchronized {
       if (SparkEnv.get != null && !isMaintenanceRunning) {
         maintenanceTask = new MaintenanceTask(
           storeConf.maintenanceInterval,
-          task = { doMaintenance() },
-          onError = { loadedProviders.synchronized {
+          task = {
+            doMaintenance()
+          },
+          onError = {
+            loadedProviders.synchronized {
               logInfo("Stopping maintenance task since an error was 
encountered.")
               stopMaintenanceTask()
+              // SPARK-44504 - Unload explicitly to force closing underlying 
DB instance

Review Comment:
   nit: this looks odd, the change should be already done. Could you please 
rebase with the latest master?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to