HeartSaVioR commented on code in PR #42012:
URL: https://github.com/apache/spark/pull/42012#discussion_r1264640356
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1852,6 +1852,15 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
+ val STATE_STORE_MAINTENANCE_THREADS =
+ buildConf("spark.sql.streaming.stateStore.stateStoreMaintenanceThreads")
Review Comment:
nit: `numStateStoreMaintenanceThreads` to make clear what this configures.
Also, `NUM_STATE_STORE_MAINTENANCE_THREADS` for variable name, and
`numStateStoreMaintenanceThreads` in method name.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -471,9 +479,46 @@ object StateStore extends Logging {
def isRunning: Boolean = !future.isDone
}
+ /**
+ * Thread Pool that runs maintenance on partitions that are scheduled by
+ * MaintenanceTask periodically
+ */
+ class MaintenanceThreadPool(numThreads: Int) {
+ private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+ numThreads, "state-store-maintenance-thread")
+
+ def execute(runnable: Runnable): Unit = {
+ threadPool.execute(runnable)
+ }
+
+ def isRunning(): Boolean = {
+ !threadPool.isShutdown
+ }
+
+ def stop(): Unit = {
+ threadPool.shutdown()
+ }
+ }
+
+ private def addPartition(providerId: StateStoreProviderId): Unit = {
+ partitions.add(providerId)
+ }
+
+ private def partitionIsQueued(providerId: StateStoreProviderId): Boolean = {
+ partitions.contains(providerId)
+ }
+
+ private def removePartition(providerId: StateStoreProviderId): Unit = {
+ partitions.remove(providerId)
+ }
+
Review Comment:
nit: consolidate two empty lines into one
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -439,6 +439,14 @@ object StateStore extends Logging {
@GuardedBy("loadedProviders")
private val schemaValidated = new mutable.HashMap[StateStoreProviderId,
Option[Throwable]]()
+ private val threadPoolLock = new Object
Review Comment:
(No need to address, just thinking out loud)
I wonder there is a way to abstract this better, e.g. put all the details
for maintenance task into MaintenanceTask class. But it does not look like be
feasible unless we make a change to MaintenanceTask so that it contains the
actual task implementation. I'd prefer that, but does not need to be the scope
of this PR.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -439,6 +439,14 @@ object StateStore extends Logging {
@GuardedBy("loadedProviders")
private val schemaValidated = new mutable.HashMap[StateStoreProviderId,
Option[Throwable]]()
+ private val threadPoolLock = new Object
+
+ @GuardedBy("threadPoolLock")
+ private var threadPoolException: Throwable = null
+
+ @GuardedBy("threadPoolLock")
+ private val partitions = new mutable.HashSet[StateStoreProviderId]
Review Comment:
nit: the name is too general to indicate what it will be used. Could we
please either add code comment to explain or rename it to be slightly verbose?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -471,9 +479,46 @@ object StateStore extends Logging {
def isRunning: Boolean = !future.isDone
}
+ /**
+ * Thread Pool that runs maintenance on partitions that are scheduled by
+ * MaintenanceTask periodically
+ */
+ class MaintenanceThreadPool(numThreads: Int) {
+ private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+ numThreads, "state-store-maintenance-thread")
+
+ def execute(runnable: Runnable): Unit = {
+ threadPool.execute(runnable)
+ }
+
+ def isRunning(): Boolean = {
+ !threadPool.isShutdown
+ }
+
+ def stop(): Unit = {
+ threadPool.shutdown()
+ }
+ }
+
+ private def addPartition(providerId: StateStoreProviderId): Unit = {
+ partitions.add(providerId)
+ }
+
+ private def partitionIsQueued(providerId: StateStoreProviderId): Boolean = {
Review Comment:
nit: same feedback for "too general"
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -471,9 +479,46 @@ object StateStore extends Logging {
def isRunning: Boolean = !future.isDone
}
+ /**
+ * Thread Pool that runs maintenance on partitions that are scheduled by
+ * MaintenanceTask periodically
+ */
+ class MaintenanceThreadPool(numThreads: Int) {
+ private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+ numThreads, "state-store-maintenance-thread")
+
+ def execute(runnable: Runnable): Unit = {
+ threadPool.execute(runnable)
+ }
+
+ def isRunning(): Boolean = {
+ !threadPool.isShutdown
+ }
+
+ def stop(): Unit = {
+ threadPool.shutdown()
+ }
+ }
+
+ private def addPartition(providerId: StateStoreProviderId): Unit = {
+ partitions.add(providerId)
+ }
+
+ private def partitionIsQueued(providerId: StateStoreProviderId): Boolean = {
+ partitions.contains(providerId)
+ }
+
+ private def removePartition(providerId: StateStoreProviderId): Unit = {
Review Comment:
nit: same feedback for "too general"
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##########
@@ -27,6 +27,12 @@ class StateStoreConf(
def this() = this(new SQLConf)
+ /**
+ * Size of MaintenanceThreadPool to perform maintenance tasks
+ * for StateStore
+ */
+ val maintenanceThreadPoolSizeForStateStore: Int =
sqlConf.stateStoreMaintenanceThreads
Review Comment:
If we are going to have different names here, it may probably mean either is
not clear.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -471,9 +479,46 @@ object StateStore extends Logging {
def isRunning: Boolean = !future.isDone
}
+ /**
+ * Thread Pool that runs maintenance on partitions that are scheduled by
+ * MaintenanceTask periodically
+ */
+ class MaintenanceThreadPool(numThreads: Int) {
+ private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+ numThreads, "state-store-maintenance-thread")
+
+ def execute(runnable: Runnable): Unit = {
+ threadPool.execute(runnable)
+ }
+
+ def isRunning(): Boolean = {
+ !threadPool.isShutdown
+ }
+
+ def stop(): Unit = {
+ threadPool.shutdown()
+ }
+ }
+
+ private def addPartition(providerId: StateStoreProviderId): Unit = {
Review Comment:
nit: same feedback for "too general"
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -439,6 +439,14 @@ object StateStore extends Logging {
@GuardedBy("loadedProviders")
private val schemaValidated = new mutable.HashMap[StateStoreProviderId,
Option[Throwable]]()
+ private val threadPoolLock = new Object
Review Comment:
Same for new threadPool and partitions - they are two different instances
which could be ideally abstracted into one. Though I'm still OK with deferring
refactor to later - at least the change follows the way we did, and refactor
may deal with two levels of pools altogether.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]