This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0d26cfc  [SPARK-27460][TESTS][2.4] Running slowest test suites in 
their own forked JVMs for higher parallelism
0d26cfc is described below

commit 0d26cfcecd451a42a8d3eb14e6d66a88e955ce9e
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Thu Sep 19 21:03:24 2019 -0700

    [SPARK-27460][TESTS][2.4] Running slowest test suites in their own forked 
JVMs for higher parallelism
    
    ## What changes were proposed in this pull request?
    
    This is a backport of https://github.com/apache/spark/pull/24373 , 
https://github.com/apache/spark/pull/24404 and 
https://github.com/apache/spark/pull/24434
    
    This patch modifies SparkBuild so that the largest / slowest test suites 
(or collections of suites) can run in their own forked JVMs, allowing them to 
be run in parallel with each other. This opt-in / whitelisting approach allows 
us to increase parallelism without having to fix a long-tail of flakiness / 
brittleness issues in tests which aren't performance bottlenecks.
    
    See comments in SparkBuild.scala for information on the details, including 
a summary of why we sometimes opt to run entire groups of tests in a single 
forked JVM .
    
    The time of full new pull request test in Jenkins is reduced by around 53%:
    before changes: 4hr 40min
    after changes: 2hr 13min
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #25861 from dongjoon-hyun/SPARK-27460.
    
    Lead-authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Co-authored-by: gatorsmile <gatorsm...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/ExecutorAllocationManagerSuite.scala     | 18 ++--
 .../org/apache/spark/SparkContextInfoSuite.scala   |  9 +-
 .../scala/org/apache/spark/SparkFunSuite.scala     | 46 ++++++++++-
 .../org/apache/spark/StatusTrackerSuite.scala      |  2 +-
 .../deploy/history/FsHistoryProviderSuite.scala    | 20 +++--
 .../scheduler/SparkListenerWithClusterSuite.scala  | 10 +--
 project/SparkBuild.scala                           | 95 +++++++++++++++++++++-
 .../spark/deploy/yarn/BaseYarnClusterSuite.scala   |  2 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  4 +-
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |  5 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  3 +-
 .../sql/streaming/FileStreamSourceSuite.scala      |  2 +-
 .../apache/spark/sql/streaming/StreamTest.scala    |  2 +-
 .../sql/streaming/StreamingQueryManagerSuite.scala | 17 ++--
 .../apache/spark/sql/test/SharedSparkSession.scala |  7 +-
 .../spark/sql/hive/client/HiveClientSuite.scala    |  2 +
 .../org/apache/spark/streaming/ReceiverSuite.scala |  2 +-
 17 files changed, 202 insertions(+), 44 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index a69045f..df5d265 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.mockito.Matchers.{any, eq => meq}
 import org.mockito.Mockito.{mock, never, verify, when}
-import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config
@@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock
  */
 class ExecutorAllocationManagerSuite
   extends SparkFunSuite
-  with LocalSparkContext
-  with BeforeAndAfter {
+  with LocalSparkContext {
 
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
   private val contexts = new mutable.ListBuffer[SparkContext]()
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     contexts.clear()
   }
 
-  after {
-    contexts.foreach(_.stop())
+  override def afterEach(): Unit = {
+    try {
+      contexts.foreach(_.stop())
+    } finally {
+      super.afterEach()
+    }
   }
 
   private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
@@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite
     assert(totalRunningTasks(manager) === 0)
   }
 
-  test("cancel pending executors when no longer needed") {
+  testRetry("cancel pending executors when no longer needed") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index 051a13c..c45f104 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark
 
+import scala.concurrent.duration._
+
 import org.scalatest.Assertions
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.storage.StorageLevel
 
@@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with 
LocalSparkContext {
   test("getRDDStorageInfo only reports on RDDs that actually persist data") {
     sc = new SparkContext("local", "test")
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-    assert(sc.getRDDStorageInfo.size === 0)
+    assert(sc.getRDDStorageInfo.length === 0)
     rdd.collect()
     sc.listenerBus.waitUntilEmpty(10000)
-    assert(sc.getRDDStorageInfo.size === 1)
+    eventually(timeout(10.seconds), interval(100.milliseconds)) {
+      assert(sc.getRDDStorageInfo.length === 1)
+    }
     assert(sc.getRDDStorageInfo.head.isCached)
     assert(sc.getRDDStorageInfo.head.memSize > 0)
     assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3128902..ffb679f 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark
 // scalastyle:off
 import java.io.File
 
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
+import scala.annotation.tailrec
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, 
FunSuite, Outcome}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.AccumulatorContext
@@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext
 abstract class SparkFunSuite
   extends FunSuite
   with BeforeAndAfterAll
+  with BeforeAndAfterEach
   with ThreadAudit
   with Logging {
 // scalastyle:on
@@ -88,6 +91,47 @@ abstract class SparkFunSuite
   }
 
   /**
+   * Note: this method doesn't support `BeforeAndAfter`. You must use 
`BeforeAndAfterEach` to
+   * set up and tear down resources.
+   */
+  def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = {
+    test(s) {
+      retry(n) {
+        body
+      }
+    }
+  }
+
+  /**
+   * Note: this method doesn't support `BeforeAndAfter`. You must use 
`BeforeAndAfterEach` to
+   * set up and tear down resources.
+   */
+  def retry[T](n: Int)(body: => T): T = {
+    if (this.isInstanceOf[BeforeAndAfter]) {
+      throw new UnsupportedOperationException(
+        s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " +
+          s"Please use ${classOf[BeforeAndAfterEach]} instead.")
+    }
+    retry0(n, n)(body)
+  }
+
+  @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = {
+    try body
+    catch { case e: Throwable =>
+      if (n > 0) {
+        logWarning(e.getMessage, e)
+        logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n")
+        // Reset state before re-attempting in order so that tests which use 
patterns like
+        // LocalSparkContext to clean up state can work correctly when retried.
+        afterEach()
+        beforeEach()
+        retry0(n-1, n0)(body)
+      }
+      else throw e
+    }
+  }
+
+  /**
    * Log the suite name and the test name before and after each test.
    *
    * Subclasses should never override this method. If they wish to run
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index a15ae04..75812ae 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._
 
 class StatusTrackerSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {
 
-  test("basic status API usage") {
+  testRetry("basic status API usage") {
     sc = new SparkContext("local", "test", new SparkConf(false))
     val jobFuture = sc.parallelize(1 to 10000, 
2).map(identity).groupBy(identity).collectAsync()
     val jobId: Int = eventually(timeout(10 seconds)) {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 6aad00b..dc15da5 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._
 import org.mockito.ArgumentMatcher
 import org.mockito.Matchers.{any, argThat}
 import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
-import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 
@@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 
-class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with Logging {
+class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
 
   private var testDir: File = null
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
   }
 
-  after {
-    Utils.deleteRecursively(testDir)
+  override def afterEach(): Unit = {
+    try {
+      Utils.deleteRecursively(testDir)
+    } finally {
+      super.afterEach()
+    }
   }
 
   /** Create a fake log file using the new log format used in Spark 1.3+ */
@@ -487,7 +491,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1 second), interval(10 millis)) {
+      eventually(timeout(3.second), interval(10.milliseconds)) {
         provider.getConfig().keys should not contain ("HDFS State")
       }
     } finally {
@@ -495,7 +499,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     }
   }
 
-  test("provider reports error after FS leaves safe mode") {
+  testRetry("provider reports error after FS leaves safe mode") {
     testDir.delete()
     val clock = new ManualClock()
     val provider = new SafeModeTestProvider(createTestConf(), clock)
@@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1 second), interval(10 millis)) {
+      eventually(timeout(3.second), interval(10.milliseconds)) {
         verify(errorHandler).uncaughtException(any(), any())
       }
     } finally {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 123f7f4..a6576e0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -19,25 +19,23 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable
 
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, 
TestUtils}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 /**
  * Unit tests for SparkListener that require a local cluster.
  */
-class SparkListenerWithClusterSuite extends SparkFunSuite with 
LocalSparkContext
-  with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerWithClusterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
   }
 
-  test("SparkListener sends executor added message") {
+  testRetry("SparkListener sends executor added message") {
     val listener = new SaveExecutorInfo
     sc.addSparkListener(listener)
 
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 56b27fa..1edbe17 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -430,6 +430,84 @@ object SparkBuild extends PomBuild {
       else x.settings(Seq[Setting[_]](): _*)
     } ++ Seq[Project](OldDeps.project)
   }
+
+  if (!sys.env.contains("SERIAL_SBT_TESTS")) {
+    allProjects.foreach(enable(SparkParallelTestGrouping.settings))
+  }
+}
+
+object SparkParallelTestGrouping {
+  // Settings for parallelizing tests. The basic strategy here is to run the 
slowest suites (or
+  // collections of suites) in their own forked JVMs, allowing us to gain 
parallelism within a
+  // SBT project. Here, we take a whitelisting approach where the default 
behavior is to run all
+  // tests sequentially in a single JVM, requiring us to manually opt-in to 
the extra parallelism.
+  //
+  // There are a reasons why such a whitelist approach is good:
+  //
+  //    1. Launching one JVM per suite adds significant overhead for 
short-running suites. In
+  //       addition to JVM startup time and JIT warmup, it appears that 
initialization of Derby
+  //       metastores can be very slow so creating a fresh warehouse per suite 
is inefficient.
+  //
+  //    2. When parallelizing within a project we need to give each forked JVM 
a different tmpdir
+  //       so that the metastore warehouses do not collide. Unfortunately, it 
seems that there are
+  //       some tests which have an overly tight dependency on the default 
tmpdir, so those fragile
+  //       tests need to continue re-running in the default configuration (or 
need to be rewritten).
+  //       Fixing that problem would be a huge amount of work for limited 
payoff in most cases
+  //       because most test suites are short-running.
+  //
+
+  private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
+    "org.apache.spark.DistributedSuite",
+    "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
+    "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
+    "org.apache.spark.sql.catalyst.expressions.CastSuite",
+    "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
+    "org.apache.spark.sql.hive.HiveExternalCatalogSuite",
+    "org.apache.spark.sql.hive.StatisticsSuite",
+    "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
+    "org.apache.spark.sql.hive.client.VersionsSuite",
+    "org.apache.spark.sql.hive.client.HiveClientVersions",
+    "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
+    "org.apache.spark.ml.classification.LogisticRegressionSuite",
+    "org.apache.spark.ml.classification.LinearSVCSuite",
+    "org.apache.spark.sql.SQLQueryTestSuite"
+  )
+
+  private val DEFAULT_TEST_GROUP = "default_test_group"
+
+  private def testNameToTestGroup(name: String): String = name match {
+    case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
+    case _ => DEFAULT_TEST_GROUP
+  }
+
+  lazy val settings = Seq(
+    testGrouping in Test := {
+      val tests: Seq[TestDefinition] = (definedTests in Test).value
+      val defaultForkOptions = ForkOptions(
+        bootJars = Nil,
+        javaHome = javaHome.value,
+        connectInput = connectInput.value,
+        outputStrategy = outputStrategy.value,
+        runJVMOptions = (javaOptions in Test).value,
+        workingDirectory = Some(baseDirectory.value),
+        envVars = (envVars in Test).value
+      )
+      tests.groupBy(test => testNameToTestGroup(test.name)).map { case 
(groupName, groupTests) =>
+        val forkOptions = {
+          if (groupName == DEFAULT_TEST_GROUP) {
+            defaultForkOptions
+          } else {
+            defaultForkOptions.copy(runJVMOptions = 
defaultForkOptions.runJVMOptions ++
+              
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
+          }
+        }
+        new Tests.Group(
+          name = groupName,
+          tests = groupTests,
+          runPolicy = Tests.SubProcess(forkOptions))
+      }
+    }.toSeq
+  )
 }
 
 object Core {
@@ -844,8 +922,14 @@ object TestSettings {
     testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
     // Enable Junit testing.
     libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % 
"test",
-    // Only allow one test at a time, even across projects, since they run in 
the same JVM
-    parallelExecution in Test := false,
+    // `parallelExecutionInTest` controls whether test suites belonging to the 
same SBT project
+    // can run in parallel with one another. It does NOT control whether tests 
execute in parallel
+    // within the same JVM (which is controlled by `testForkedParallel`) or 
whether test cases
+    // within the same suite can run in parallel (which is a ScalaTest runner 
option which is passed
+    // to the underlying runner but is not a SBT-level configuration). This 
needs to be `true` in
+    // order for the extra parallelism enabled by `SparkParallelTestGrouping` 
to take effect.
+    // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be 
feature-flagged.
+    parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) 
false else true },
     // Make sure the test temp directory exists.
     resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map 
{ outDir: File =>
       var dir = new File(testTempDir)
@@ -866,7 +950,12 @@ object TestSettings {
       }
       Seq.empty[File]
     }).value,
-    concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
+    concurrentRestrictions in Global := {
+      // The number of concurrent test groups is empirically chosen based on 
experience
+      // with Jenkins flakiness.
+      if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in 
Global).value
+      else Seq(Tags.limit(Tags.ForkedTestGroup, 4))
+    }
   )
 
 }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 3a79131..48ce178 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite
 
     val handle = launcher.startApplication()
     try {
-      eventually(timeout(2 minutes), interval(1 second)) {
+      eventually(timeout(3.minutes), interval(1.second)) {
         assert(handle.getState().isFinal())
       }
     } finally {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 32e3d05..2c34b4e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -202,7 +202,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       .startApplication()
 
     try {
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(3.minutes), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.RUNNING)
       }
 
@@ -210,7 +210,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       handle.getAppId() should startWith ("application_")
       handle.stop()
 
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(3.minutes), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.KILLED)
       }
     } finally {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index ab817ff..f3a741f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
       val df = session.sql(sql)
       val schema = df.schema
       val notIncludedMsg = "[not included in comparison]"
+      val clsName = this.getClass.getCanonicalName
       // Get answer, but also get rid of the #1234 expression ids that show up 
in explain plans
       val answer = 
df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x")
-        .replaceAll("Location.*/sql/core/", s"Location 
${notIncludedMsg}sql/core/")
+        .replaceAll(
+          s"Location.*/sql/core/spark-warehouse/$clsName/",
+          s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
         .replaceAll("Created By.*", s"Created By $notIncludedMsg")
         .replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
         .replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index c5f3fe5..0091f8b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -490,7 +490,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with 
SharedSQLContext with
     }
 
     // Wait for listener to finish computing the metrics for the execution.
-    while (statusStore.executionsList().last.metricValues == null) {
+    while (statusStore.executionsList().isEmpty ||
+        statusStore.executionsList().last.metricValues == null) {
       Thread.sleep(100)
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fb0b365..9f6553e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
-  override val streamingTimeout = 20.seconds
+  override val streamingTimeout = 80.seconds
 
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   private def createFileStreamSource(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 35644c5..7bd1320 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -87,7 +87,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with 
TimeLimits with Be
   protected val defaultUseV2Sink = false
 
   /** How long to wait for an active stream to catch up when checking a 
result. */
-  val streamingTimeout = 10.seconds
+  val streamingTimeout = 60.seconds
 
   /** A trait for actions that can be performed while testing a streaming 
DataFrame. */
   trait StreamAction
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 46eec73..d17d035 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -36,21 +36,26 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.Utils
 
-class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
+class StreamingQueryManagerSuite extends StreamTest {
 
   import AwaitTerminationTester._
   import testImplicits._
 
   override val streamingTimeout = 20.seconds
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     assert(spark.streams.active.isEmpty)
     spark.streams.resetTerminated()
   }
 
-  after {
-    assert(spark.streams.active.isEmpty)
-    spark.streams.resetTerminated()
+  override def afterEach(): Unit = {
+    try {
+      assert(spark.streams.active.isEmpty)
+      spark.streams.resetTerminated()
+    } finally {
+      super.afterEach()
+    }
   }
 
   testQuietly("listing") {
@@ -84,7 +89,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
-  testQuietly("awaitAnyTermination without timeout and resetTerminated") {
+  testRetry("awaitAnyTermination without timeout and resetTerminated") {
     val datasets = Seq.fill(5)(makeDataset._2)
     withQueriesOn(datasets: _*) { queries =>
       require(queries.size === datasets.size)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index e7e0ce6..efdbc7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
 /**
  * Helper trait for SQL test suites where all tests share a single 
[[TestSparkSession]].
@@ -36,7 +36,7 @@ trait SharedSparkSession
   with Eventually { self: Suite =>
 
   protected def sparkConf = {
-    new SparkConf()
+    val conf = new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set(SQLConf.CODEGEN_FALLBACK.key, "false")
@@ -45,6 +45,9 @@ trait SharedSparkSession
       // this rule may potentially block testing of other optimization rules 
such as
       // ConstantPropagation etc.
       .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
+    conf.set(
+      StaticSQLConf.WAREHOUSE_PATH,
+      conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
   }
 
   /**
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index fa9f753..5bdb13a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType}
+import org.apache.spark.util.Utils
 
 // TODO: Refactor this to `HivePartitionFilteringSuite`
 class HiveClientSuite(version: String)
@@ -45,6 +46,7 @@ class HiveClientSuite(version: String)
 
     val hadoopConf = new Configuration()
     hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql)
+    hadoopConf.set("hive.metastore.warehouse.dir", 
Utils.createTempDir().toURI().toString())
     val client = buildClient(hadoopConf)
     client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h 
INT, chunk STRING)")
 
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index fc6218a..33f93da 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -121,7 +121,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits 
with Serializable {
     }
 
     // Verify that stopping actually stops the thread
-    failAfter(100 millis) {
+    failAfter(1.second) {
       receiver.stop("test")
       assert(receiver.isStopped)
       assert(!receiver.otherThread.isAlive)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to