Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/16627#discussion_r96778846
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
---
@@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils
/** Unique identifier for a [[StateStore]] */
case class StateStoreId(checkpointLocation: String, operatorId: Long,
partitionId: Int)
+/**
+ * The class to maintain [[StateStore]]s. When a SparkContext is active
(i.e. SparkEnv.get is not
+ * null), it will run a periodic background task to do maintenance on the
loaded stores. The
+ * background task will be cancelled when `stop` is called or
`SparkEnv.get` becomes `null`.
+ */
+class StateStoreContext extends Logging {
+ import StateStore._
+
+ private val maintenanceTaskExecutor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
+
+ @GuardedBy("StateStore.LOCK")
+ private val loadedProviders = new mutable.HashMap[StateStoreId,
StateStoreProvider]()
+
+ @GuardedBy("StateStore.LOCK")
+ private var _coordRef: StateStoreCoordinatorRef = null
+
+ @GuardedBy("StateStore.LOCK")
+ private var isStopped: Boolean = false
+
+ /** Get the state store provider or add `stateStoreProvider` if not
exist */
+ def getOrElseUpdate(
+ storeId: StateStoreId,
+ stateStoreProvider: => StateStoreProvider): StateStoreProvider =
LOCK.synchronized {
+ loadedProviders.getOrElseUpdate(storeId, stateStoreProvider)
+ }
+
+ /** Unload a state store provider */
+ def unload(storeId: StateStoreId): Unit = LOCK.synchronized {
loadedProviders.remove(storeId) }
+
+ /** Whether a state store provider is loaded or not */
+ def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized {
--- End diff --
Why do these need to be synchronized by external Lock?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]