[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16627


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r97163440
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -179,14 +210,14 @@ object StateStore extends Logging {
   /** Start the periodic maintenance task if not already started and if 
Spark active */
   private def startMaintenanceIfNeeded(): Unit = 
loadedProviders.synchronized {
 val env = SparkEnv.get
-if (maintenanceTask == null && env != null) {
+if (env != null && (maintenanceTask == null || 
!maintenanceTask.isRunning)) {
--- End diff --

Can you replace this with the method isMaintenanceRunning?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r97159995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -124,12 +125,42 @@ object StateStore extends Logging {
   val MAINTENANCE_INTERVAL_CONFIG = 
"spark.sql.streaming.stateStore.maintenanceInterval"
   val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
 
+  @GuardedBy("loadedProviders")
   private val loadedProviders = new mutable.HashMap[StateStoreId, 
StateStoreProvider]()
-  private val maintenanceTaskExecutor =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
 
-  @volatile private var maintenanceTask: ScheduledFuture[_] = null
-  @volatile private var _coordRef: StateStoreCoordinatorRef = null
+  class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) {
--- End diff --

Can you add docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r97162573
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -124,12 +125,42 @@ object StateStore extends Logging {
   val MAINTENANCE_INTERVAL_CONFIG = 
"spark.sql.streaming.stateStore.maintenanceInterval"
   val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
 
+  @GuardedBy("loadedProviders")
   private val loadedProviders = new mutable.HashMap[StateStoreId, 
StateStoreProvider]()
-  private val maintenanceTaskExecutor =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
 
-  @volatile private var maintenanceTask: ScheduledFuture[_] = null
-  @volatile private var _coordRef: StateStoreCoordinatorRef = null
+  class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) {
--- End diff --

you should mention the properties of this class. that it automatically 
cancels the periodic task if there is an exception. and what is onError for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r96779672
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils
 /** Unique identifier for a [[StateStore]] */
 case class StateStoreId(checkpointLocation: String, operatorId: Long, 
partitionId: Int)
 
+/**
+ * The class to maintain [[StateStore]]s. When a SparkContext is active 
(i.e. SparkEnv.get is not
+ * null), it will run a periodic background task to do maintenance on the 
loaded stores. The
+ * background task will be cancelled when `stop` is called or 
`SparkEnv.get` becomes `null`.
+ */
+class StateStoreContext extends Logging {
+  import StateStore._
+
+  private val maintenanceTaskExecutor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
+
+  @GuardedBy("StateStore.LOCK")
+  private val loadedProviders = new mutable.HashMap[StateStoreId, 
StateStoreProvider]()
+
+  @GuardedBy("StateStore.LOCK")
+  private var _coordRef: StateStoreCoordinatorRef = null
+
+  @GuardedBy("StateStore.LOCK")
+  private var isStopped: Boolean = false
+
+  /** Get the state store provider or add `stateStoreProvider` if not 
exist */
+  def getOrElseUpdate(
+  storeId: StateStoreId,
+  stateStoreProvider: => StateStoreProvider): StateStoreProvider = 
LOCK.synchronized {
+loadedProviders.getOrElseUpdate(storeId, stateStoreProvider)
+  }
+
+  /** Unload a state store provider */
+  def unload(storeId: StateStoreId): Unit = LOCK.synchronized { 
loadedProviders.remove(storeId) }
+
+  /** Whether a state store provider is loaded or not */
+  def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized {
+loadedProviders.contains(storeId)
+  }
+
+  /** Whether the maintenance task is running */
+  def isMaintenanceRunning: Boolean = LOCK.synchronized { !isStopped }
+
+  /** Unload and stop all state store providers */
+  def stop(): Unit = LOCK.synchronized {
+if (!isStopped) {
+  isStopped = true
+  loadedProviders.clear()
+  maintenanceTaskExecutor.shutdown()
+  logInfo("StateStore stopped")
+}
+  }
+
+  /** Start the periodic maintenance task if not already started and if 
Spark active */
+  private def startMaintenance(): Unit = {
+val env = SparkEnv.get
+if (env != null) {
+  val periodMs = env.conf.getTimeAsMs(
+MAINTENANCE_INTERVAL_CONFIG, 
s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s")
+  val runnable = new Runnable {
+override def run(): Unit = { doMaintenance() }
+  }
+  maintenanceTaskExecutor.scheduleAtFixedRate(
+runnable, periodMs, periodMs, TimeUnit.MILLISECONDS)
+  logInfo("State Store maintenance task started")
+}
+  }
+
+  /**
+   * Execute background maintenance task in all the loaded store providers 
if they are still
+   * the active instances according to the coordinator.
+   */
+  private def doMaintenance(): Unit = {
+logDebug("Doing maintenance")
+if (SparkEnv.get == null) {
+  stop()
+} else {
+  LOCK.synchronized { loadedProviders.toSeq }.foreach { case (id, 
provider) =>
--- End diff --

This is locking the state store while maintenance is going on. since it 
using the same lock as the external lock this, the task using the store will 
block on the maintenance task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r96778846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -34,6 +35,132 @@ import org.apache.spark.util.ThreadUtils
 /** Unique identifier for a [[StateStore]] */
 case class StateStoreId(checkpointLocation: String, operatorId: Long, 
partitionId: Int)
 
+/**
+ * The class to maintain [[StateStore]]s. When a SparkContext is active 
(i.e. SparkEnv.get is not
+ * null), it will run a periodic background task to do maintenance on the 
loaded stores. The
+ * background task will be cancelled when `stop` is called or 
`SparkEnv.get` becomes `null`.
+ */
+class StateStoreContext extends Logging {
+  import StateStore._
+
+  private val maintenanceTaskExecutor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
+
+  @GuardedBy("StateStore.LOCK")
+  private val loadedProviders = new mutable.HashMap[StateStoreId, 
StateStoreProvider]()
+
+  @GuardedBy("StateStore.LOCK")
+  private var _coordRef: StateStoreCoordinatorRef = null
+
+  @GuardedBy("StateStore.LOCK")
+  private var isStopped: Boolean = false
+
+  /** Get the state store provider or add `stateStoreProvider` if not 
exist */
+  def getOrElseUpdate(
+  storeId: StateStoreId,
+  stateStoreProvider: => StateStoreProvider): StateStoreProvider = 
LOCK.synchronized {
+loadedProviders.getOrElseUpdate(storeId, stateStoreProvider)
+  }
+
+  /** Unload a state store provider */
+  def unload(storeId: StateStoreId): Unit = LOCK.synchronized { 
loadedProviders.remove(storeId) }
+
+  /** Whether a state store provider is loaded or not */
+  def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized {
--- End diff --

Why do these need to be synchronized by external Lock?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16627#discussion_r96545362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -198,7 +214,7 @@ object StateStore extends Logging {
   private def doMaintenance(): Unit = {
 logDebug("Doing maintenance")
 if (SparkEnv.get == null) {
--- End diff --

There is another potential issue here: if a SparkContext is created after 
checking `SparkEnv.get == null`, the following `stop` may cancel a new valid 
task. However, I think that won't happen in practice, so don't fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16627: [SPARK-19267][SS]Fix a race condition when stoppi...

2017-01-17 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/16627

[SPARK-19267][SS]Fix a race condition when stopping StateStore

## What changes were proposed in this pull request?

There is a race condition when stopping StateStore which makes 
`StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the 
running task to finish, and an out-of-date task may fail `doMaintenance` and 
cancel the new task. Here is a reproducer: 
https://github.com/zsxwing/spark/commit/dde1b5b106ba034861cf19e16883cfe181faa6f3

This PR changes `stop` to `stop(blocking: Boolean = true)` to allow the 
caller waiting until the background thread exits.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-19267

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16627.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16627


commit 5fa096c3e11285429100155cdfeb8cc50ef653a8
Author: Shixiong Zhu 
Date:   2017-01-18T00:30:35Z

Fix a race condition when stopping StateStore




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org