Repository: spark Updated Branches: refs/heads/master d54bfec2e -> b10837ab1
[SPARK-22557][TEST] Use ThreadSignaler explicitly ## What changes were proposed in this pull request? ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark tests uses `ThreadSignaler` explicitly which has the same default behavior of interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce potential flakiness. ## How was this patch tested? This is testsuite-only update. This should passes the Jenkins tests. Author: Dongjoon Hyun <dongj...@apache.org> Closes #19784 from dongjoon-hyun/use_thread_signaler. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b10837ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b10837ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b10837ab Branch: refs/heads/master Commit: b10837ab1a7bef04bf7a2773b9e44ed9206643fe Parents: d54bfec Author: Dongjoon Hyun <dongj...@apache.org> Authored: Mon Nov 20 13:32:01 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Mon Nov 20 13:32:01 2017 +0900 ---------------------------------------------------------------------- .../test/scala/org/apache/spark/DistributedSuite.scala | 7 +++++-- core/src/test/scala/org/apache/spark/DriverSuite.scala | 5 ++++- core/src/test/scala/org/apache/spark/UnpersistSuite.scala | 8 ++++++-- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 9 ++++++++- .../scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 5 ++++- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 ++++- .../OutputCommitCoordinatorIntegrationSuite.scala | 5 ++++- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++++++-- .../test/scala/org/apache/spark/util/EventLoopSuite.scala | 5 ++++- .../execution/streaming/ProcessingTimeExecutorSuite.scala | 8 +++++--- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 2 ++ .../org/apache/spark/sql/hive/SparkSubmitTestUtils.scala | 5 ++++- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 5 +++-- .../apache/spark/streaming/StreamingContextSuite.scala | 5 +++-- .../spark/streaming/receiver/BlockGeneratorSuite.scala | 7 ++++--- 15 files changed, 68 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index f800561..ea9f6d2 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark import org.scalatest.Matchers -import org.scalatest.concurrent.TimeLimits._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} import org.apache.spark.security.EncryptionFunSuite @@ -30,7 +30,10 @@ class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() { class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext - with EncryptionFunSuite { + with EncryptionFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler val clusterUrl = "local-cluster[2,1,1024]" http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DriverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index be80d27..962945e 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io.File -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.time.SpanSugar._ @@ -27,6 +27,9 @@ import org.apache.spark.util.Utils class DriverSuite extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + ignore("driver should exit after finishing without cleanup (SPARK-530)") { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val masters = Table("master", "local", "local-cluster[2,1,1024]") http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/UnpersistSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala index bc3f58c..b58a3eb 100644 --- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala +++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark -import org.scalatest.concurrent.TimeLimits._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} -class UnpersistSuite extends SparkFunSuite with LocalSparkContext { +class UnpersistSuite extends SparkFunSuite with LocalSparkContext with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + test("unpersist RDD") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index cfbf56f..d0a34c5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path} import org.scalatest.{BeforeAndAfterEach, Matchers} -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -102,6 +102,9 @@ class SparkSubmitSuite import SparkSubmitSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeEach() { super.beforeEach() System.setProperty("spark.testing", "true") @@ -1016,6 +1019,10 @@ class SparkSubmitSuite } object SparkSubmitSuite extends SparkFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index de0e71a..24b0144 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -34,6 +34,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim @transient private var sc: SparkContext = _ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeAll() { super.beforeAll() sc = new SparkContext("local[2]", "test") http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6222e57..d395e09 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -102,6 +102,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi import DAGSchedulerSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index a27dadc..d6ff5bb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext} -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Seconds, Span} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} @@ -34,6 +34,9 @@ class OutputCommitCoordinatorIntegrationSuite with LocalSparkContext with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeAll(): Unit = { super.beforeAll() val conf = new SparkConf() http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d45c194..f3e8a2e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,8 +31,8 @@ import org.apache.commons.lang3.RandomUtils import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager @@ -57,10 +57,13 @@ import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with LocalSparkContext with ResetSystemProperties - with EncryptionFunSuite { + with EncryptionFunSuite with TimeLimits { import BlockManagerSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + var conf: SparkConf = null var store: BlockManager = null var store2: BlockManager = null @@ -1450,6 +1453,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private object BlockManagerSuite { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + private implicit class BlockManagerTestUtils(store: BlockManager) { def dropFromMemoryIfExists( http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index f4f8388..5507457 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -23,13 +23,16 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits import org.apache.spark.SparkFunSuite class EventLoopSuite extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + test("EventLoop") { val buffer = new ConcurrentLinkedQueue[Int] val eventLoop = new EventLoop[Int]("test") { http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 519e3c0..80c7691 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -22,16 +22,18 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import org.eclipse.jetty.util.ConcurrentHashSet -import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.concurrent.TimeLimits._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.streaming.util.StreamManualClock -class ProcessingTimeExecutorSuite extends SparkFunSuite { +class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler val timeout = 10.seconds http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 70b39b9..e68fca0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -69,7 +69,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} */ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with BeforeAndAfterAll { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + override def afterAll(): Unit = { super.afterAll() StateStore.stop() // stop the state store maintenance thread and unload store providers http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala index ede44df..68ed97d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala @@ -23,7 +23,7 @@ import java.util.Date import scala.collection.mutable.ArrayBuffer -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ @@ -33,6 +33,9 @@ import org.apache.spark.util.Utils trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 5fc626c..145c48e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -38,6 +38,9 @@ import org.apache.spark.util.Utils /** Testsuite for testing the network receiver behavior */ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val signaler: Signaler = ThreadSignaler + test("receiver life cycle") { val receiver = new FakeReceiver @@ -60,8 +63,6 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { // Verify that the receiver intercept[Exception] { - // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x - implicit val signaler: Signaler = ThreadSignaler failAfter(200 millis) { executingThread.join() } http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5810e73..52c8959 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -44,6 +44,9 @@ import org.apache.spark.util.Utils class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits with Logging { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val signaler: Signaler = ThreadSignaler + val master = "local[2]" val appName = this.getClass.getSimpleName val batchDuration = Milliseconds(500) @@ -406,8 +409,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // test whether awaitTermination() does not exit if not time is given val exception = intercept[Exception] { - // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x - implicit val signaler: Signaler = ThreadSignaler failAfter(1000 millis) { ssc.awaitTermination() throw new Exception("Did not wait for stop") http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index 898da44..580f831 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -24,18 +24,19 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter import org.scalatest.Matchers._ -import org.scalatest.concurrent.{Signaler, ThreadSignaler} +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits._ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.storage.StreamBlockId import org.apache.spark.util.ManualClock -class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { +class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + private val blockIntervalMs = 10 private val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms") @volatile private var blockGenerator: BlockGenerator = null --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org