HeartSaVioR commented on code in PR #42066:
URL: https://github.com/apache/spark/pull/42066#discussion_r1280065414
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1852,6 +1852,17 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
+ val NUM_STATE_STORE_MAINTENANCE_THREADS =
+ buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
+ .internal()
+ .doc("Number of threads in the thread pool that perform clean up and
snapshotting tasks " +
+ "for stateful streaming queries. The default value is 2 so that this
thread pool " +
Review Comment:
nit: the explanation of default value is no longer true. need to update.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -587,6 +622,8 @@ object StateStore extends Logging {
/** Stop maintenance thread and reset the maintenance task */
def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
if (maintenanceTask != null) {
+ maintenanceThreadPool.stop()
Review Comment:
1. Let's cleanup relevant variables as well, `threadPoolException`, and
`maintenancePartitions`. It should be necessary to re-initialize
maintenanceTask.
2. For the full safety, I'd have two if statements, to handle the cleanup of
`maintenanceThreadPool` and `maintenanceTask` separately.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -628,17 +687,44 @@ object StateStore extends Logging {
if (SparkEnv.get == null) {
throw new IllegalStateException("SparkEnv not active, cannot do
maintenance on StateStores")
}
- loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id,
provider) =>
- try {
- provider.doMaintenance()
- if (!verifyIfStoreInstanceActive(id)) {
- unload(id)
- logInfo(s"Unloaded $provider")
- }
- } catch {
- case NonFatal(e) =>
- logWarning(s"Error managing $provider, stopping management thread")
- throw e
+ loadedProviders.synchronized {
+ loadedProviders.toSeq
+ }.foreach { case (id, provider) =>
+ // check exception
+ if (threadPoolException.get() != null) {
Review Comment:
First of all, this is not thread-safe although we use AtomicReference, as we
are dealing with two different operations.
Second, if we reset the exception here, other partition(s) will likely still
schedule the maintenance task, which may not be something we desire to. We may
want to skip scheduling others once the error was found.
Third, exception could be "null" (race condition can happen - imagine two
threads executing L695 concurrently) and it could lead to other issue on
throwing null.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -602,22 +639,44 @@ object StateStore extends Logging {
}
/** Start the periodic maintenance task if not already started and if Spark
active */
- private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+ private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
+ val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
loadedProviders.synchronized {
if (SparkEnv.get != null && !isMaintenanceRunning) {
maintenanceTask = new MaintenanceTask(
storeConf.maintenanceInterval,
- task = { doMaintenance() },
- onError = { loadedProviders.synchronized {
+ task = {
+ doMaintenance()
+ },
+ onError = {
+ loadedProviders.synchronized {
logInfo("Stopping maintenance task since an error was
encountered.")
stopMaintenanceTask()
+ // SPARK-44504 - Unload explicitly to force closing underlying
DB instance
Review Comment:
nit: this looks odd, the change should be already done. Could you please
rebase with the latest master?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]