Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16627#discussion_r96779672
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
    @@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils
     /** Unique identifier for a [[StateStore]] */
     case class StateStoreId(checkpointLocation: String, operatorId: Long, 
partitionId: Int)
     
    +/**
    + * The class to maintain [[StateStore]]s. When a SparkContext is active 
(i.e. SparkEnv.get is not
    + * null), it will run a periodic background task to do maintenance on the 
loaded stores. The
    + * background task will be cancelled when `stop` is called or 
`SparkEnv.get` becomes `null`.
    + */
    +class StateStoreContext extends Logging {
    +  import StateStore._
    +
    +  private val maintenanceTaskExecutor =
    +    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
    +
    +  @GuardedBy("StateStore.LOCK")
    +  private val loadedProviders = new mutable.HashMap[StateStoreId, 
StateStoreProvider]()
    +
    +  @GuardedBy("StateStore.LOCK")
    +  private var _coordRef: StateStoreCoordinatorRef = null
    +
    +  @GuardedBy("StateStore.LOCK")
    +  private var isStopped: Boolean = false
    +
    +  /** Get the state store provider or add `stateStoreProvider` if not 
exist */
    +  def getOrElseUpdate(
    +      storeId: StateStoreId,
    +      stateStoreProvider: => StateStoreProvider): StateStoreProvider = 
LOCK.synchronized {
    +    loadedProviders.getOrElseUpdate(storeId, stateStoreProvider)
    +  }
    +
    +  /** Unload a state store provider */
    +  def unload(storeId: StateStoreId): Unit = LOCK.synchronized { 
loadedProviders.remove(storeId) }
    +
    +  /** Whether a state store provider is loaded or not */
    +  def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized {
    +    loadedProviders.contains(storeId)
    +  }
    +
    +  /** Whether the maintenance task is running */
    +  def isMaintenanceRunning: Boolean = LOCK.synchronized { !isStopped }
    +
    +  /** Unload and stop all state store providers */
    +  def stop(): Unit = LOCK.synchronized {
    +    if (!isStopped) {
    +      isStopped = true
    +      loadedProviders.clear()
    +      maintenanceTaskExecutor.shutdown()
    +      logInfo("StateStore stopped")
    +    }
    +  }
    +
    +  /** Start the periodic maintenance task if not already started and if 
Spark active */
    +  private def startMaintenance(): Unit = {
    +    val env = SparkEnv.get
    +    if (env != null) {
    +      val periodMs = env.conf.getTimeAsMs(
    +        MAINTENANCE_INTERVAL_CONFIG, 
s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s")
    +      val runnable = new Runnable {
    +        override def run(): Unit = { doMaintenance() }
    +      }
    +      maintenanceTaskExecutor.scheduleAtFixedRate(
    +        runnable, periodMs, periodMs, TimeUnit.MILLISECONDS)
    +      logInfo("State Store maintenance task started")
    +    }
    +  }
    +
    +  /**
    +   * Execute background maintenance task in all the loaded store providers 
if they are still
    +   * the active instances according to the coordinator.
    +   */
    +  private def doMaintenance(): Unit = {
    +    logDebug("Doing maintenance")
    +    if (SparkEnv.get == null) {
    +      stop()
    +    } else {
    +      LOCK.synchronized { loadedProviders.toSeq }.foreach { case (id, 
provider) =>
    --- End diff --
    
    This is locking the state store while maintenance is going on. since it 
using the same lock as the external lock this, the task using the store will 
block on the maintenance task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to