[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16627 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r97163440 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -179,14 +210,14 @@ object StateStore extends Logging { /** Start the periodic maintenance task if not already started and if Spark active */ private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized { val env = SparkEnv.get -if (maintenanceTask == null && env != null) { +if (env != null && (maintenanceTask == null || !maintenanceTask.isRunning)) { --- End diff -- Can you replace this with the method isMaintenanceRunning? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r97159995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -124,12 +125,42 @@ object StateStore extends Logging { val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval" val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 + @GuardedBy("loadedProviders") private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() - private val maintenanceTaskExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") - @volatile private var maintenanceTask: ScheduledFuture[_] = null - @volatile private var _coordRef: StateStoreCoordinatorRef = null + class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { --- End diff -- Can you add docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r97162573 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -124,12 +125,42 @@ object StateStore extends Logging { val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval" val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 + @GuardedBy("loadedProviders") private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() - private val maintenanceTaskExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") - @volatile private var maintenanceTask: ScheduledFuture[_] = null - @volatile private var _coordRef: StateStoreCoordinatorRef = null + class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { --- End diff -- you should mention the properties of this class. that it automatically cancels the periodic task if there is an exception. and what is onError for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r96779672 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils /** Unique identifier for a [[StateStore]] */ case class StateStoreId(checkpointLocation: String, operatorId: Long, partitionId: Int) +/** + * The class to maintain [[StateStore]]s. When a SparkContext is active (i.e. SparkEnv.get is not + * null), it will run a periodic background task to do maintenance on the loaded stores. The + * background task will be cancelled when `stop` is called or `SparkEnv.get` becomes `null`. + */ +class StateStoreContext extends Logging { + import StateStore._ + + private val maintenanceTaskExecutor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") + + @GuardedBy("StateStore.LOCK") + private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() + + @GuardedBy("StateStore.LOCK") + private var _coordRef: StateStoreCoordinatorRef = null + + @GuardedBy("StateStore.LOCK") + private var isStopped: Boolean = false + + /** Get the state store provider or add `stateStoreProvider` if not exist */ + def getOrElseUpdate( + storeId: StateStoreId, + stateStoreProvider: => StateStoreProvider): StateStoreProvider = LOCK.synchronized { +loadedProviders.getOrElseUpdate(storeId, stateStoreProvider) + } + + /** Unload a state store provider */ + def unload(storeId: StateStoreId): Unit = LOCK.synchronized { loadedProviders.remove(storeId) } + + /** Whether a state store provider is loaded or not */ + def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized { +loadedProviders.contains(storeId) + } + + /** Whether the maintenance task is running */ + def isMaintenanceRunning: Boolean = LOCK.synchronized { !isStopped } + + /** Unload and stop all state store providers */ + def stop(): Unit = LOCK.synchronized { +if (!isStopped) { + isStopped = true + loadedProviders.clear() + maintenanceTaskExecutor.shutdown() + logInfo("StateStore stopped") +} + } + + /** Start the periodic maintenance task if not already started and if Spark active */ + private def startMaintenance(): Unit = { +val env = SparkEnv.get +if (env != null) { + val periodMs = env.conf.getTimeAsMs( +MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s") + val runnable = new Runnable { +override def run(): Unit = { doMaintenance() } + } + maintenanceTaskExecutor.scheduleAtFixedRate( +runnable, periodMs, periodMs, TimeUnit.MILLISECONDS) + logInfo("State Store maintenance task started") +} + } + + /** + * Execute background maintenance task in all the loaded store providers if they are still + * the active instances according to the coordinator. + */ + private def doMaintenance(): Unit = { +logDebug("Doing maintenance") +if (SparkEnv.get == null) { + stop() +} else { + LOCK.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => --- End diff -- This is locking the state store while maintenance is going on. since it using the same lock as the external lock this, the task using the store will block on the maintenance task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r96778846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils /** Unique identifier for a [[StateStore]] */ case class StateStoreId(checkpointLocation: String, operatorId: Long, partitionId: Int) +/** + * The class to maintain [[StateStore]]s. When a SparkContext is active (i.e. SparkEnv.get is not + * null), it will run a periodic background task to do maintenance on the loaded stores. The + * background task will be cancelled when `stop` is called or `SparkEnv.get` becomes `null`. + */ +class StateStoreContext extends Logging { + import StateStore._ + + private val maintenanceTaskExecutor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") + + @GuardedBy("StateStore.LOCK") + private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() + + @GuardedBy("StateStore.LOCK") + private var _coordRef: StateStoreCoordinatorRef = null + + @GuardedBy("StateStore.LOCK") + private var isStopped: Boolean = false + + /** Get the state store provider or add `stateStoreProvider` if not exist */ + def getOrElseUpdate( + storeId: StateStoreId, + stateStoreProvider: => StateStoreProvider): StateStoreProvider = LOCK.synchronized { +loadedProviders.getOrElseUpdate(storeId, stateStoreProvider) + } + + /** Unload a state store provider */ + def unload(storeId: StateStoreId): Unit = LOCK.synchronized { loadedProviders.remove(storeId) } + + /** Whether a state store provider is loaded or not */ + def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized { --- End diff -- Why do these need to be synchronized by external Lock? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16627#discussion_r96545362 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -198,7 +214,7 @@ object StateStore extends Logging { private def doMaintenance(): Unit = { logDebug("Doing maintenance") if (SparkEnv.get == null) { --- End diff -- There is another potential issue here: if a SparkContext is created after checking `SparkEnv.get == null`, the following `stop` may cancel a new valid task. However, I think that won't happen in practice, so don't fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/16627 [SPARK-19267][SS]Fix a race condition when stopping StateStore ## What changes were proposed in this pull request? There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: https://github.com/zsxwing/spark/commit/dde1b5b106ba034861cf19e16883cfe181faa6f3 This PR changes `stop` to `stop(blocking: Boolean = true)` to allow the caller waiting until the background thread exits. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-19267 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16627.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16627 commit 5fa096c3e11285429100155cdfeb8cc50ef653a8 Author: Shixiong ZhuDate: 2017-01-18T00:30:35Z Fix a race condition when stopping StateStore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org