[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2015-01-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r23372367
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

 In hindsight, instead of modeling awaitTermination against Akka 
ActorSystem's awaitTermination (which return Unit) , I should have modeled it 
like Java ExecutorService's awaitTermination which returns a Boolean. Now its 
not possible to change the API without breaking compatiblity. :(

@tdas, Sorry that I forgot to reply you. You said you designed it just like 
Akka `ActorSystem.awaitTermination`. But  
[ActorSystem.awaitTermination](https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L394)
 will throw  a TimeoutException in case of timeout.

```Scala
  /**
   * Block current thread until the system has been shutdown, or the 
specified
   * timeout has elapsed. This will block until after all on termination
   * callbacks have been run.
   *
   * @throws TimeoutException in case of timeout
   */
  def awaitTermination(timeout: Duration): Unit
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-30 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68406636
  
Merging this. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3661


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22328459
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

I think that will break a lot of existing programs for people. A lot of 
time (even in our own test cases) `awaitTermination(timeout)` is used for wait 
for a short period of time before check checking status or something. Currently 
that times out return quietly. If instead it starts throwing exceptions, then 
it will completely break some applications. So I strongly advise against 
changing this behavior. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22328749
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

In hindsight, my modeling of awaitTermination against Akka 
[ActorSystem](http://doc.akka.io/api/akka/2.0/akka/actor/ActorSystem.html)'s 
awaitTermination (which return Unit) was the wrong idea. I should have done it 
like Java ExecutorService's awaitTermination which returns a Boolean. Now its 
not possible to change the API without breaking compatiblity. :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68304161
  
This almost looks good. Can you add a unit test in the 
`StreamingContextSuite` to test the new public API `isTerminated` (with and 
without error)? You could update the existing unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68304935
  
Actually, correction. Can you put the public API `isTerminated` on a 
different PR? I want to separate the addition to public API (which requires a 
design discussion) from this bug fix PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68323687
  
  [Test build #24876 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24876/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68323743
  
I deleted the commit about `isTerminated`. It will be sent in a different 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68326339
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24876/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68326338
  
  [Test build #24876 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24876/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68328777
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68328870
  
  [Test build #24881 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24881/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68331881
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24881/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-68331877
  
  [Test build #24881 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24881/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22272234
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

How about making `awaitTermination` throw a TimeoutException if timeout? It 
looks a better API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22277996
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

@tdas what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22230703
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

It seems a little counterintuitive that `isTerminated` would have three 
outputs: `true`, `false`, or `Exception`.  The alternative is a bit weird, 
though: we'd have to have something like `isTerminatedWithError` or `hasError` 
to distinguish between the two cases.

Sorry for being so nitpicky here, but I'm just trying to reason out what's 
the most consistent public API for this use case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22150536
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

One thought: what happens if `awaitTermination` times out, an error is 
thrown, and then we check `isTerminated`?  In that (rare) case, I guess we 
might miss the error-causing exception.  I suppose the caller could do some 
sort of double-checking, like

```scala
waiter.awaitTermination(1000)
if (!waiter.isTerminated) {
  throw Exception(...)
} else {
   waiter.awaitTermination(1)  // re-throws the error, if one occurred
}
```

Maybe this is a super-rare edge-case which we don't care about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22152847
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

 what happens if awaitTermination times out, an error is thrown, and then 
we check isTerminated?

How about throwing the error in `isTerminated` for such case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22007092
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

I considered whether this should throw `TimeoutException` instead of 
signaling failure via a boolean, but I guess this is sort of like 
`Object.wait()` or `Condition.wait()`, both of which don't throw exceptions.

Also, it looks like the usages of this in `StreamingContext` support this 
theory:

```scala
  /**
   * Wait for the execution to stop. Any exceptions that occurs during the 
execution
   * will be thrown in this thread.
   */
  def awaitTermination() {
waiter.waitForStopOrError()
  }

  /**
   * Wait for the execution to stop. Any exceptions that occurs during the 
execution
   * will be thrown in this thread.
   * @param timeout time to wait in milliseconds
   */
  def awaitTermination(timeout: Long) {
waiter.waitForStopOrError(timeout)
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22007180
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

I suppose the documentation for the 
`SparkContext.awaitTermination(timeout)` could be improved to convey what 
happens when the timeout occurs...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67400892
  
This looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22020241
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

Do you think if it's better to return `Boolean` to indicate if it's 
timeout? Although it will break the source compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22020714
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

Another option would be to just add a new `def isTerminated: Boolean` 
method, which would let users write code like

```
waiter.awaitTermination(1000)
if (!waiter.isTerminated) {
  throw Exception(...)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r22021962
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable): Unit = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop(): Unit = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
 }
+  }
 
-// If not already stopped, then wait
-if (!stopped) {
-  if (timeout  0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
--- End diff --

Agreed. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67436556
  
  [Test build #24568 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24568/consoleFull)
 for   PR 3661 at commit 
[`15357d2`](https://github.com/apache/spark/commit/15357d2046f3a35a3bfbf9fecde5c4036f703c87).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67441156
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24568/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67441152
  
  [Test build #24568 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24568/consoleFull)
 for   PR 3661 at commit 
[`15357d2`](https://github.com/apache/spark/commit/15357d2046f3a35a3bfbf9fecde5c4036f703c87).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class Analyzer(catalog: Catalog, registry: FunctionRegistry, 
caseSensitive: Boolean)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21931348
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable) = {
--- End diff --

Minor nit, but mind adding `: Unit` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21931382
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by lock
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
-  }
+  // Guarded by lock
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyError(e: Throwable) = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
+  def notifyStop() = {
--- End diff --

Same here: could this have an explicit  `: Unit` type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21931523
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

In general, I'm a fan of the FindBugs annotations.  I've had trouble 
getting the various analysis tools to work well with them in Scala, though.  +1 
to this commenting convention, though; this is very helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67263837
  
 Maybe a naive question, but is there a reason why we can't just use 
synchronized methods here?

I wrote the reason in the description: Used Condition to rewrite 
ContextWaiter because it provides a convenient API awaitNanos for timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67264712
  
  [Test build #24521 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24521/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67270689
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24521/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-67270683
  
  [Test build #24521 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24521/consoleFull)
 for   PR 3661 at commit 
[`52247f5`](https://github.com/apache/spark/commit/52247f5ff48f1fdf285daac20846c7587a30f340).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class Analyzer(catalog: Catalog, registry: FunctionRegistry, 
caseSensitive: Boolean)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/3661

[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 
'spurious wakeup'

Used `Condition` to rewrite `ContextWaiter` because it provides a 
convenient API `awaitNanos` for timeout.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-4813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3661.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3661


commit e06bd4fdc7d052ef55e2d98e68441586fe9d2026
Author: zsxwing zsxw...@gmail.com
Date:   2014-12-10T08:25:39Z

Fix the issue that ContextWaiter didn't handle 'spurious wakeup'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66421083
  
  [Test build #24302 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24302/consoleFull)
 for   PR 3661 at commit 
[`e06bd4f`](https://github.com/apache/spark/commit/e06bd4fdc7d052ef55e2d98e68441586fe9d2026).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21591585
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

Minor point - these are not in the JDK but in a Findbugs library for 
JSR-305. It's not used in Spark, and happens to be a dependency now. Maybe not 
worth using it just 1 place?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21591750
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
   private var error: Throwable = null
+
+  @GuardedBy(lock)
   private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
+  def notifyError(e: Throwable) = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyStop() = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
-}
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
+lock.lock()
+try {
+  if (timeout  0) {
+while (true) {
--- End diff --

Maybe it's just me but it feels like these loops would be simpler just 
testing `while (!stopped  error == null)`? `nanos` would be tested in the 
other one too. This avoids duplication, and also avoids the unreachable return 
value, because you check these conditions in one place at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21592200
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

 Maybe not worth using it just 1 place?

So which one do you prefer?
1. Use comments to describe such information.
2. Use `GuardedBy` from now on.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21592261
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

In addition, now Findbugs does not recognize `GuardedBy` in Scala codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21592650
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

BTW, I turned to `GuardedBy` because @aarondav asked me to do it in #3634


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21592824
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
--- End diff --

Yes, that's why I brought it up. It's not actually a standard Java 
annotation (unless someone tells me it just turned up in 8 or something) but 
part of JSR-305. This is a dependency of Spark core at the moment, but none of 
the annotations are used. I think we should just not use them instead of using 
this lib in 1 place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3661#discussion_r21593635
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---
@@ -17,30 +17,74 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.locks.ReentrantLock
+import javax.annotation.concurrent.GuardedBy
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  @GuardedBy(lock)
   private var error: Throwable = null
+
+  @GuardedBy(lock)
   private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-error = e
-notifyAll()
+  def notifyError(e: Throwable) = {
+lock.lock()
+try {
+  error = e
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def notifyStop() = synchronized {
-stopped = true
-notifyAll()
+  def notifyStop() = {
+lock.lock()
+try {
+  stopped = true
+  condition.signalAll()
+} finally {
+  lock.unlock()
+}
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-// If already had error, then throw it
-if (error != null) {
-  throw error
-}
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
+lock.lock()
+try {
+  if (timeout  0) {
+while (true) {
--- End diff --

It's cleaner now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66429086
  
  [Test build #24305 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24305/consoleFull)
 for   PR 3661 at commit 
[`be42bcf`](https://github.com/apache/spark/commit/be42bcfaa38a3f3fbe4fc759656a61c72f0fb556).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66430212
  
  [Test build #24302 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24302/consoleFull)
 for   PR 3661 at commit 
[`e06bd4f`](https://github.com/apache/spark/commit/e06bd4fdc7d052ef55e2d98e68441586fe9d2026).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66430222
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24302/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66438151
  
  [Test build #24305 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24305/consoleFull)
 for   PR 3661 at commit 
[`be42bcf`](https://github.com/apache/spark/commit/be42bcfaa38a3f3fbe4fc759656a61c72f0fb556).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4813][Streaming] Fix the issue that Con...

2014-12-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3661#issuecomment-66438157
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24305/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org