Repository: spark
Updated Branches:
  refs/heads/master d54bfec2e -> b10837ab1


[SPARK-22557][TEST] Use ThreadSignaler explicitly

## What changes were proposed in this pull request?

ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark 
tests uses `ThreadSignaler` explicitly which has the same default behavior of 
interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce 
potential flakiness.

## How was this patch tested?

This is testsuite-only update. This should passes the Jenkins tests.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #19784 from dongjoon-hyun/use_thread_signaler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b10837ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b10837ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b10837ab

Branch: refs/heads/master
Commit: b10837ab1a7bef04bf7a2773b9e44ed9206643fe
Parents: d54bfec
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Mon Nov 20 13:32:01 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Mon Nov 20 13:32:01 2017 +0900

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/DistributedSuite.scala    |  7 +++++--
 core/src/test/scala/org/apache/spark/DriverSuite.scala    |  5 ++++-
 core/src/test/scala/org/apache/spark/UnpersistSuite.scala |  8 ++++++--
 .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala  |  9 ++++++++-
 .../scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala |  5 ++++-
 .../org/apache/spark/scheduler/DAGSchedulerSuite.scala    |  5 ++++-
 .../OutputCommitCoordinatorIntegrationSuite.scala         |  5 ++++-
 .../org/apache/spark/storage/BlockManagerSuite.scala      | 10 ++++++++--
 .../test/scala/org/apache/spark/util/EventLoopSuite.scala |  5 ++++-
 .../execution/streaming/ProcessingTimeExecutorSuite.scala |  8 +++++---
 .../scala/org/apache/spark/sql/streaming/StreamTest.scala |  2 ++
 .../org/apache/spark/sql/hive/SparkSubmitTestUtils.scala  |  5 ++++-
 .../scala/org/apache/spark/streaming/ReceiverSuite.scala  |  5 +++--
 .../apache/spark/streaming/StreamingContextSuite.scala    |  5 +++--
 .../spark/streaming/receiver/BlockGeneratorSuite.scala    |  7 ++++---
 15 files changed, 68 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index f800561..ea9f6d2 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.scalatest.Matchers
-import org.scalatest.concurrent.TimeLimits._
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Millis, Span}
 
 import org.apache.spark.security.EncryptionFunSuite
@@ -30,7 +30,10 @@ class NotSerializableExn(val notSer: NotSerializableClass) 
extends Throwable() {
 
 
 class DistributedSuite extends SparkFunSuite with Matchers with 
LocalSparkContext
-  with EncryptionFunSuite {
+  with EncryptionFunSuite with TimeLimits {
+
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
 
   val clusterUrl = "local-cluster[2,1,1024]"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala 
b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index be80d27..962945e 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io.File
 
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.prop.TableDrivenPropertyChecks._
 import org.scalatest.time.SpanSugar._
 
@@ -27,6 +27,9 @@ import org.apache.spark.util.Utils
 
 class DriverSuite extends SparkFunSuite with TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   ignore("driver should exit after finishing without cleanup (SPARK-530)") {
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
     val masters = Table("master", "local", "local-cluster[2,1,1024]")

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala 
b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index bc3f58c..b58a3eb 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -17,10 +17,14 @@
 
 package org.apache.spark
 
-import org.scalatest.concurrent.TimeLimits._
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Millis, Span}
 
-class UnpersistSuite extends SparkFunSuite with LocalSparkContext {
+class UnpersistSuite extends SparkFunSuite with LocalSparkContext with 
TimeLimits {
+
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   test("unpersist RDD") {
     sc = new SparkContext("local", "test")
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index cfbf56f..d0a34c5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
 import org.scalatest.{BeforeAndAfterEach, Matchers}
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
@@ -102,6 +102,9 @@ class SparkSubmitSuite
 
   import SparkSubmitSuite._
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   override def beforeEach() {
     super.beforeEach()
     System.setProperty("spark.testing", "true")
@@ -1016,6 +1019,10 @@ class SparkSubmitSuite
 }
 
 object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
+
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
   def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = {
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index de0e71a..24b0144 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.Duration
 
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
@@ -34,6 +34,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
 
   @transient private var sc: SparkContext = _
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   override def beforeAll() {
     super.beforeAll()
     sc = new SparkContext("local[2]", "test")

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6222e57..d395e09 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
 
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
@@ -102,6 +102,9 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 
   import DAGSchedulerSuite._
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index a27dadc..d6ff5bb 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Seconds, Span}
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, TaskContext}
@@ -34,6 +34,9 @@ class OutputCommitCoordinatorIntegrationSuite
   with LocalSparkContext
   with TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   override def beforeAll(): Unit = {
     super.beforeAll()
     val conf = new SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d45c194..f3e8a2e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,8 +31,8 @@ import org.apache.commons.lang3.RandomUtils
 import org.mockito.{Matchers => mc}
 import org.mockito.Mockito.{mock, times, verify, when}
 import org.scalatest._
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.TimeLimits._
 
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
@@ -57,10 +57,13 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 
 class BlockManagerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach
   with PrivateMethodTester with LocalSparkContext with ResetSystemProperties
-  with EncryptionFunSuite {
+  with EncryptionFunSuite with TimeLimits {
 
   import BlockManagerSuite._
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -1450,6 +1453,9 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
 private object BlockManagerSuite {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   private implicit class BlockManagerTestUtils(store: BlockManager) {
 
     def dropFromMemoryIfExists(

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala 
b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index f4f8388..5507457 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -23,13 +23,16 @@ import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.TimeLimits
 
 import org.apache.spark.SparkFunSuite
 
 class EventLoopSuite extends SparkFunSuite with TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   test("EventLoop") {
     val buffer = new ConcurrentLinkedQueue[Int]
     val eventLoop = new EventLoop[Int]("test") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 519e3c0..80c7691 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -22,16 +22,18 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.mutable
 
 import org.eclipse.jetty.util.ConcurrentHashSet
-import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, 
TimeLimits}
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.concurrent.TimeLimits._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.sql.streaming.util.StreamManualClock
 
-class ProcessingTimeExecutorSuite extends SparkFunSuite {
+class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
+
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
 
   val timeout = 10.seconds
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 70b39b9..e68fca0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -69,7 +69,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  */
 trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with 
BeforeAndAfterAll {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+
   override def afterAll(): Unit = {
     super.afterAll()
     StateStore.stop() // stop the state store maintenance thread and unload 
store providers

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
index ede44df..68ed97d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
@@ -23,7 +23,7 @@ import java.util.Date
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
 
@@ -33,6 +33,9 @@ import org.apache.spark.util.Utils
 
 trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] 
= None): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 5fc626c..145c48e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -38,6 +38,9 @@ import org.apache.spark.util.Utils
 /** Testsuite for testing the network receiver behavior */
 class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val signaler: Signaler = ThreadSignaler
+
   test("receiver life cycle") {
 
     val receiver = new FakeReceiver
@@ -60,8 +63,6 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits 
with Serializable {
 
     // Verify that the receiver
     intercept[Exception] {
-      // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 
3.x
-      implicit val signaler: Signaler = ThreadSignaler
       failAfter(200 millis) {
         executingThread.join()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5810e73..52c8959 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -44,6 +44,9 @@ import org.apache.spark.util.Utils
 
 class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with 
TimeLimits with Logging {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val signaler: Signaler = ThreadSignaler
+
   val master = "local[2]"
   val appName = this.getClass.getSimpleName
   val batchDuration = Milliseconds(500)
@@ -406,8 +409,6 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with TimeL
 
     // test whether awaitTermination() does not exit if not time is given
     val exception = intercept[Exception] {
-      // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 
3.x
-      implicit val signaler: Signaler = ThreadSignaler
       failAfter(1000 millis) {
         ssc.awaitTermination()
         throw new Exception("Did not wait for stop")

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index 898da44..580f831 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -24,18 +24,19 @@ import scala.collection.mutable
 
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers._
-import org.scalatest.concurrent.{Signaler, ThreadSignaler}
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.TimeLimits._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.util.ManualClock
 
-class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
+class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with 
TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+
   private val blockIntervalMs = 10
   private val conf = new SparkConf().set("spark.streaming.blockInterval", 
s"${blockIntervalMs}ms")
   @volatile private var blockGenerator: BlockGenerator = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to