This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 0d26cfc [SPARK-27460][TESTS][2.4] Running slowest test suites in their own forked JVMs for higher parallelism 0d26cfc is described below commit 0d26cfcecd451a42a8d3eb14e6d66a88e955ce9e Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Thu Sep 19 21:03:24 2019 -0700 [SPARK-27460][TESTS][2.4] Running slowest test suites in their own forked JVMs for higher parallelism ## What changes were proposed in this pull request? This is a backport of https://github.com/apache/spark/pull/24373 , https://github.com/apache/spark/pull/24404 and https://github.com/apache/spark/pull/24434 This patch modifies SparkBuild so that the largest / slowest test suites (or collections of suites) can run in their own forked JVMs, allowing them to be run in parallel with each other. This opt-in / whitelisting approach allows us to increase parallelism without having to fix a long-tail of flakiness / brittleness issues in tests which aren't performance bottlenecks. See comments in SparkBuild.scala for information on the details, including a summary of why we sometimes opt to run entire groups of tests in a single forked JVM . The time of full new pull request test in Jenkins is reduced by around 53%: before changes: 4hr 40min after changes: 2hr 13min ## How was this patch tested? Unit test Closes #25861 from dongjoon-hyun/SPARK-27460. Lead-authored-by: Gengliang Wang <gengliang.w...@databricks.com> Co-authored-by: gatorsmile <gatorsm...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/ExecutorAllocationManagerSuite.scala | 18 ++-- .../org/apache/spark/SparkContextInfoSuite.scala | 9 +- .../scala/org/apache/spark/SparkFunSuite.scala | 46 ++++++++++- .../org/apache/spark/StatusTrackerSuite.scala | 2 +- .../deploy/history/FsHistoryProviderSuite.scala | 20 +++-- .../scheduler/SparkListenerWithClusterSuite.scala | 10 +-- project/SparkBuild.scala | 95 +++++++++++++++++++++- .../spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 4 +- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 5 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 3 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../sql/streaming/StreamingQueryManagerSuite.scala | 17 ++-- .../apache/spark/sql/test/SharedSparkSession.scala | 7 +- .../spark/sql/hive/client/HiveClientSuite.scala | 2 + .../org/apache/spark/streaming/ReceiverSuite.scala | 2 +- 17 files changed, 202 insertions(+), 44 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a69045f..df5d265 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{mock, never, verify, when} -import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.scalatest.PrivateMethodTester import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config @@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock */ class ExecutorAllocationManagerSuite extends SparkFunSuite - with LocalSparkContext - with BeforeAndAfter { + with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ private val contexts = new mutable.ListBuffer[SparkContext]() - before { + override def beforeEach(): Unit = { + super.beforeEach() contexts.clear() } - after { - contexts.foreach(_.stop()) + override def afterEach(): Unit = { + try { + contexts.foreach(_.stop()) + } finally { + super.afterEach() + } } private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { @@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite assert(totalRunningTasks(manager) === 0) } - test("cancel pending executors when no longer needed") { + testRetry("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 051a13c..c45f104 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark +import scala.concurrent.duration._ + import org.scalatest.Assertions +import org.scalatest.concurrent.Eventually._ import org.apache.spark.storage.StorageLevel @@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { test("getRDDStorageInfo only reports on RDDs that actually persist data") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - assert(sc.getRDDStorageInfo.size === 0) + assert(sc.getRDDStorageInfo.length === 0) rdd.collect() sc.listenerBus.waitUntilEmpty(10000) - assert(sc.getRDDStorageInfo.size === 1) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(sc.getRDDStorageInfo.length === 1) + } assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 3128902..ffb679f 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark // scalastyle:off import java.io.File -import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} +import scala.annotation.tailrec + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome} import org.apache.spark.internal.Logging import org.apache.spark.util.AccumulatorContext @@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext abstract class SparkFunSuite extends FunSuite with BeforeAndAfterAll + with BeforeAndAfterEach with ThreadAudit with Logging { // scalastyle:on @@ -88,6 +91,47 @@ abstract class SparkFunSuite } /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = { + test(s) { + retry(n) { + body + } + } + } + + /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def retry[T](n: Int)(body: => T): T = { + if (this.isInstanceOf[BeforeAndAfter]) { + throw new UnsupportedOperationException( + s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " + + s"Please use ${classOf[BeforeAndAfterEach]} instead.") + } + retry0(n, n)(body) + } + + @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = { + try body + catch { case e: Throwable => + if (n > 0) { + logWarning(e.getMessage, e) + logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n") + // Reset state before re-attempting in order so that tests which use patterns like + // LocalSparkContext to clean up state can work correctly when retried. + afterEach() + beforeEach() + retry0(n-1, n0)(body) + } + else throw e + } + } + + /** * Log the suite name and the test name before and after each test. * * Subclasses should never override this method. If they wish to run diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index a15ae04..75812ae 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext { - test("basic status API usage") { + testRetry("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() val jobId: Int = eventually(timeout(10 seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 6aad00b..dc15da5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatcher import org.mockito.Matchers.{any, argThat} import org.mockito.Mockito.{doThrow, mock, spy, verify, when} -import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} -class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { +class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null - before { + override def beforeEach(): Unit = { + super.beforeEach() testDir = Utils.createTempDir(namePrefix = s"a b%20c+d") } - after { - Utils.deleteRecursively(testDir) + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(testDir) + } finally { + super.afterEach() + } } /** Create a fake log file using the new log format used in Spark 1.3+ */ @@ -487,7 +491,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(3.second), interval(10.milliseconds)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -495,7 +499,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("provider reports error after FS leaves safe mode") { + testRetry("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) @@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(3.second), interval(10.milliseconds)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 123f7f4..a6576e0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -19,25 +19,23 @@ package org.apache.spark.scheduler import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. */ -class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfter with BeforeAndAfterAll { +class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - before { + override def beforeEach(): Unit = { + super.beforeEach() sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite") } - test("SparkListener sends executor added message") { + testRetry("SparkListener sends executor added message") { val listener = new SaveExecutorInfo sc.addSparkListener(listener) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 56b27fa..1edbe17 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -430,6 +430,84 @@ object SparkBuild extends PomBuild { else x.settings(Seq[Setting[_]](): _*) } ++ Seq[Project](OldDeps.project) } + + if (!sys.env.contains("SERIAL_SBT_TESTS")) { + allProjects.foreach(enable(SparkParallelTestGrouping.settings)) + } +} + +object SparkParallelTestGrouping { + // Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or + // collections of suites) in their own forked JVMs, allowing us to gain parallelism within a + // SBT project. Here, we take a whitelisting approach where the default behavior is to run all + // tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism. + // + // There are a reasons why such a whitelist approach is good: + // + // 1. Launching one JVM per suite adds significant overhead for short-running suites. In + // addition to JVM startup time and JIT warmup, it appears that initialization of Derby + // metastores can be very slow so creating a fresh warehouse per suite is inefficient. + // + // 2. When parallelizing within a project we need to give each forked JVM a different tmpdir + // so that the metastore warehouses do not collide. Unfortunately, it seems that there are + // some tests which have an overly tight dependency on the default tmpdir, so those fragile + // tests need to continue re-running in the default configuration (or need to be rewritten). + // Fixing that problem would be a huge amount of work for limited payoff in most cases + // because most test suites are short-running. + // + + private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( + "org.apache.spark.DistributedSuite", + "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.CastSuite", + "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogSuite", + "org.apache.spark.sql.hive.StatisticsSuite", + "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", + "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.client.HiveClientVersions", + "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", + "org.apache.spark.ml.classification.LogisticRegressionSuite", + "org.apache.spark.ml.classification.LinearSVCSuite", + "org.apache.spark.sql.SQLQueryTestSuite" + ) + + private val DEFAULT_TEST_GROUP = "default_test_group" + + private def testNameToTestGroup(name: String): String = name match { + case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name + case _ => DEFAULT_TEST_GROUP + } + + lazy val settings = Seq( + testGrouping in Test := { + val tests: Seq[TestDefinition] = (definedTests in Test).value + val defaultForkOptions = ForkOptions( + bootJars = Nil, + javaHome = javaHome.value, + connectInput = connectInput.value, + outputStrategy = outputStrategy.value, + runJVMOptions = (javaOptions in Test).value, + workingDirectory = Some(baseDirectory.value), + envVars = (envVars in Test).value + ) + tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => + val forkOptions = { + if (groupName == DEFAULT_TEST_GROUP) { + defaultForkOptions + } else { + defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) + } + } + new Tests.Group( + name = groupName, + tests = groupTests, + runPolicy = Tests.SubProcess(forkOptions)) + } + }.toSeq + ) } object Core { @@ -844,8 +922,14 @@ object TestSettings { testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), // Enable Junit testing. libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test", - // Only allow one test at a time, even across projects, since they run in the same JVM - parallelExecution in Test := false, + // `parallelExecutionInTest` controls whether test suites belonging to the same SBT project + // can run in parallel with one another. It does NOT control whether tests execute in parallel + // within the same JVM (which is controlled by `testForkedParallel`) or whether test cases + // within the same suite can run in parallel (which is a ScalaTest runner option which is passed + // to the underlying runner but is not a SBT-level configuration). This needs to be `true` in + // order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect. + // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged. + parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true }, // Make sure the test temp directory exists. resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File => var dir = new File(testTempDir) @@ -866,7 +950,12 @@ object TestSettings { } Seq.empty[File] }).value, - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) + concurrentRestrictions in Global := { + // The number of concurrent test groups is empirically chosen based on experience + // with Jenkins flakiness. + if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value + else Seq(Tags.limit(Tags.ForkedTestGroup, 4)) + } ) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 3a79131..48ce178 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2 minutes), interval(1 second)) { + eventually(timeout(3.minutes), interval(1.second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 32e3d05..2c34b4e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -202,7 +202,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -210,7 +210,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index ab817ff..f3a741f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val df = session.sql(sql) val schema = df.schema val notIncludedMsg = "[not included in comparison]" + val clsName = this.getClass.getCanonicalName // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x") - .replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/") + .replaceAll( + s"Location.*/sql/core/spark-warehouse/$clsName/", + s"Location ${notIncludedMsg}sql/core/spark-warehouse/") .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index c5f3fe5..0091f8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -490,7 +490,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with } // Wait for listener to finish computing the metrics for the execution. - while (statusStore.executionsList().last.metricValues == null) { + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { Thread.sleep(100) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fb0b365..9f6553e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ - override val streamingTimeout = 20.seconds + override val streamingTimeout = 80.seconds /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ private def createFileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 35644c5..7bd1320 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -87,7 +87,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be protected val defaultUseV2Sink = false /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds + val streamingTimeout = 60.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 46eec73..d17d035 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -36,21 +36,26 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils -class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryManagerSuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ override val streamingTimeout = 20.seconds - before { + override def beforeEach(): Unit = { + super.beforeEach() assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() } - after { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() + override def afterEach(): Unit = { + try { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } finally { + super.afterEach() + } } testQuietly("listing") { @@ -84,7 +89,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("awaitAnyTermination without timeout and resetTerminated") { + testRetry("awaitAnyTermination without timeout and resetTerminated") { val datasets = Seq.fill(5)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index e7e0ce6..efdbc7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -36,7 +36,7 @@ trait SharedSparkSession with Eventually { self: Suite => protected def sparkConf = { - new SparkConf() + val conf = new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -45,6 +45,9 @@ trait SharedSparkSession // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + conf.set( + StaticSQLConf.WAREHOUSE_PATH, + conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index fa9f753..5bdb13a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType} +import org.apache.spark.util.Utils // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -45,6 +46,7 @@ class HiveClientSuite(version: String) val hadoopConf = new Configuration() hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql) + hadoopConf.set("hive.metastore.warehouse.dir", Utils.createTempDir().toURI().toString()) val client = buildClient(hadoopConf) client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index fc6218a..33f93da 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -121,7 +121,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { } // Verify that stopping actually stops the thread - failAfter(100 millis) { + failAfter(1.second) { receiver.stop("test") assert(receiver.isStopped) assert(!receiver.otherThread.isAlive) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org