asfgit closed pull request #23413: [SPARK-26491][CORE][TEST] Use ConfigEntry
for hardcoded configs for test categories
URL: https://github.com/apache/spark/pull/23413
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index d966582295b37..0807e653b41a9 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
@@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
- conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
+ conf.get(TEST_SCHEDULE_INTERVAL)
} else {
100
}
@@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
- val TESTING_SCHEDULE_INTERVAL_KEY =
"spark.testing.dynamicAllocation.scheduleInterval"
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89be9de083075..3a1e1b9310029 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster,
SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat,
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@@ -470,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
- for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+ for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
value <-
Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 709a380dfb636..3c5648434fa66 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
@@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
// Disable the background thread during tests.
- if (!conf.contains("spark.testing")) {
+ if (!conf.contains(IS_TESTING)) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index d5ea2523c628b..467df26c47354 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils,
Utils}
@@ -103,7 +104,6 @@ private[deploy] class Worker(
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
- private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
/**
@@ -127,7 +127,7 @@ private[deploy] class Worker(
private var connected = false
private val workerId = generateWorkerId()
private val sparkHome =
- if (testing) {
+ if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not
set!")
new File(sys.props("spark.test.home"))
} else {
diff --git
a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
index af67f41e94af1..f354d603c2e3d 100644
--- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
@@ -43,7 +43,7 @@ private[spark] case class ProcfsMetrics(
// project.
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends
Logging {
private val procfsStatFile = "stat"
- private val testing = sys.env.contains("SPARK_TESTING") ||
sys.props.contains("spark.testing")
+ private val testing = Utils.isTesting
private val pageSize = computePageSize()
private var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 85b2745a2aec4..ea79c7310349d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util._
@@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
}
// Only used for test
- private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new
LongAccumulator)
+ private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new
LongAccumulator)
import InternalAccumulator._
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
new file mode 100644
index 0000000000000..21660ab3a9512
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Tests {
+
+ val TEST_USE_COMPRESSED_OOPS_KEY = "spark.test.useCompressedOops"
+
+ val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
+ .longConf
+ .createWithDefault(Runtime.getRuntime.maxMemory)
+
+ val TEST_SCHEDULE_INTERVAL =
+ ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
+ .longConf
+ .createWithDefault(100)
+
+ val IS_TESTING = ConfigBuilder("spark.testing")
+ .booleanConf
+ .createOptional
+
+ val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
+ .booleanConf
+ .createWithDefault(false)
+
+ val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
+ .longConf
+ .createOptional
+
+ val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
+ .intConf
+ .createWithDefault(5)
+
+ val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
+ .intConf
+ .createWithDefault(4)
+
+ val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
+ .intConf
+ .createWithDefault(2)
+}
diff --git
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 8286087042741..43566cbb1279c 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.BlockId
/**
@@ -113,7 +114,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in
bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.getLong("spark.testing.memory",
Runtime.getRuntime.maxMemory)
+ val systemMaxMemory = conf.get(TEST_MEMORY)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
@@ -123,7 +124,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in
bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.getLong("spark.testing.memory",
Runtime.getRuntime.maxMemory)
+ val systemMaxMemory = conf.get(TEST_MEMORY)
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must
" +
diff --git
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 9260fd3a6fb34..7801bb87050f6 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.BlockId
/**
@@ -210,9 +211,9 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage,
in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
- val systemMemory = conf.getLong("spark.testing.memory",
Runtime.getRuntime.maxMemory)
- val reservedMemory = conf.getLong("spark.testing.reservedMemory",
- if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+ val systemMemory = conf.get(TEST_MEMORY)
+ val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
+ if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6f4c326442e1e..f6ade180ee25f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener,
ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
@@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
/** If enabled, FetchFailed will not cause stage retry, in order to surface
the problem. */
- private val disallowStageRetryForTest =
sc.getConf.getBoolean("spark.test.noStageRetry", false)
+ private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
/**
* Whether to unregister all the outputs on the host in condition that we
receive a FetchFailure,
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index adef20d3077d8..66080b6e6b4ff 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient,
StandaloneAppClientListener}
import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
@@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend(
// compute-classpath.{cmd,sh} and makes all needed jars available to child
processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
- if (sys.props.contains("spark.testing")) {
+ if (sys.props.contains(IS_TESTING.key)) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3bfdf95db84c6..e12b6b71578c1 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
import org.apache.spark.util.collection.OpenHashSet
/**
@@ -126,8 +127,8 @@ object SizeEstimator extends Logging {
private def getIsCompressedOops: Boolean = {
// This is only used by tests to override the detection of compressed
oops. The test
// actually uses a system property instead of a SparkConf, so we'll stick
with that.
- if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
+ if (System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY) != null) {
+ return System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY).toBoolean
}
// java.vm.info provides compressed ref info for IBM JDKs
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3527fee68939d..16ef38142ad9f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream,
SerializationStream, SerializerInstance}
@@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging {
* Indicates whether Spark is currently running unit tests.
*/
def isTesting: Boolean = {
- sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+ sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
}
/**
@@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging {
*/
def portMaxRetries(conf: SparkConf): Int = {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
- if (conf.contains("spark.testing")) {
+ if (conf.contains(IS_TESTING)) {
// Set a higher number of retries for tests...
maxRetries.getOrElse(100)
} else {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4083b20c23594..21050e44414f5 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler,
TimeLimits}
import org.scalatest.time.{Millis, Span}
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.security.EncryptionFunSuite
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers
with LocalSparkContex
val size = 10000
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", (size / 2).toString)
+ .set(TEST_MEMORY, size.toLong / 2)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
@@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers
with LocalSparkContex
val numPartitions = 20
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", size.toString)
+ .set(TEST_MEMORY, size.toLong)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size,
numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 38f5e8c9f0ac8..6b310b9cb67aa 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.testing", "true")
// SPARK-22864: effectively disable the allocation schedule by setting
the period to a
// really long value.
- .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
+ .set(TEST_SCHEDULE_INTERVAL, 10000L)
val sc = new SparkContext(conf)
contexts += sc
sc
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 35f728cd57fe2..ffa70425ea367 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier,
Executors, ExecutorService
import org.scalatest.Matchers
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD,
ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener,
SparkListenerTaskEnd}
@@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with
Matchers with LocalSparkC
// Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so
that we accurately
// test that the shuffle works (rather than retrying until all blocks are
local to one Executor).
- conf.set("spark.test.noStageRetry", "true")
+ conf.set(TEST_NO_STAGE_RETRY, true)
test("groupByKey without compression") {
val myConf = conf.clone().set("spark.shuffle.compress", "false")
@@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with
Matchers with LocalSparkC
}
test("[SPARK-4085] rerun map stage if reduce stage cannot find its local
shuffle file") {
- val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+ val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false)
sc = new SparkContext("local", "test", myConf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
rdd.count()
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index dad24d7c01b8b..7d114b1b0c144 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -23,6 +23,7 @@ import java.io.File
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}
/**
@@ -59,7 +60,7 @@ abstract class SparkFunSuite
protected val enableAutoThreadAudit = true
protected override def beforeAll(): Unit = {
- System.setProperty("spark.testing", "true")
+ System.setProperty(IS_TESTING.key, "true")
if (enableAutoThreadAudit) {
doThreadPreAudit()
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 6b479873f69f2..5903ae71ec66e 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.spark._
import org.apache.spark.internal.config.History._
-import org.apache.spark.util.Utils
+import org.apache.spark.internal.config.Tests._
class HistoryServerArgumentsSuite extends SparkFunSuite {
@@ -31,14 +31,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
private val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S, 1L)
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
assert(conf.get(UPDATE_INTERVAL_S) === 1L)
- assert(conf.get("spark.testing") === "true")
+ assert(conf.get(IS_TESTING).getOrElse(false))
}
test("Properties File Arguments Parsing --properties-file") {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 96458c55b5f55..bb7d3c52bc9c4 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -47,6 +47,7 @@ import org.scalatest.selenium.WebBrowser
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.status.api.v1.ApplicationInfo
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.ui.SparkUI
@@ -81,7 +82,7 @@ class HistoryServerSuite extends SparkFunSuite with
BeforeAndAfter with Matchers
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
.set(EVENT_LOG_PROCESS_TREE_METRICS, true)
@@ -400,7 +401,7 @@ class HistoryServerSuite extends SparkFunSuite with
BeforeAndAfter with Matchers
*/
test("security manager starts with spark.authenticate set") {
val conf = new SparkConf()
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set(SecurityManager.SPARK_AUTH_CONF, "true")
HistoryServer.createSecurityManager(conf)
}
@@ -422,7 +423,7 @@ class HistoryServerSuite extends SparkFunSuite with
BeforeAndAfter with Matchers
.set(UPDATE_INTERVAL_S.key, "1s")
.set(EVENT_LOG_ENABLED, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
- .remove("spark.testing")
+ .remove(IS_TESTING)
val provider = new FsHistoryProvider(myConf)
val securityManager = HistoryServer.createSecurityManager(myConf)
diff --git
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 0f32fe4059fbb..c3275add50f48 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.mockito.Mockito.when
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -48,8 +49,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
new StaticMemoryManager(
conf.clone
.set("spark.memory.fraction", "1")
- .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
+ .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+ .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxOnHeapStorageMemory = 0,
numCores = 1)
diff --git
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 5ce3453b682fe..8556e920daebb 100644
---
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -43,8 +44,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
- .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
+ .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+ .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
@@ -218,19 +219,19 @@ class UnifiedMemoryManagerSuite extends
MemoryManagerSuite with PrivateMethodTes
}
test("small heap") {
- val systemMemory = 1024 * 1024
- val reservedMemory = 300 * 1024
+ val systemMemory = 1024L * 1024
+ val reservedMemory = 300L * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
- .set("spark.testing.memory", systemMemory.toString)
- .set("spark.testing.reservedMemory", reservedMemory.toString)
+ .set(TEST_MEMORY, systemMemory)
+ .set(TEST_RESERVED_MEMORY, reservedMemory)
val mm = UnifiedMemoryManager(conf, numCores = 1)
val expectedMaxMemory = ((systemMemory - reservedMemory) *
memoryFraction).toLong
assert(mm.maxHeapMemory === expectedMaxMemory)
// Try using a system memory that's too small
- val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory /
2).toString)
+ val conf2 = conf.clone().set(TEST_MEMORY, reservedMemory / 2)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
@@ -238,13 +239,13 @@ class UnifiedMemoryManagerSuite extends
MemoryManagerSuite with PrivateMethodTes
}
test("insufficient executor memory") {
- val systemMemory = 1024 * 1024
- val reservedMemory = 300 * 1024
+ val systemMemory = 1024L * 1024
+ val reservedMemory = 300L * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
- .set("spark.testing.memory", systemMemory.toString)
- .set("spark.testing.reservedMemory", reservedMemory.toString)
+ .set(TEST_MEMORY, systemMemory)
+ .set(TEST_RESERVED_MEMORY, reservedMemory)
val mm = UnifiedMemoryManager(conf, numCores = 1)
// Try using an executor memory that's too small
@@ -259,7 +260,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
- .set("spark.testing.memory", "1000")
+ .set(TEST_MEMORY, 1000L)
val mm = UnifiedMemoryManager(conf, numCores = 2)
val ms = makeMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
@@ -285,7 +286,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
- .set("spark.testing.memory", "1000")
+ .set(TEST_MEMORY, 1000L)
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
@@ -306,9 +307,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
- .set(MEMORY_OFFHEAP_SIZE.key, "1000")
- .set("spark.testing.memory", "1000")
- .set(MEMORY_OFFHEAP_ENABLED.key, "true")
+ .set(MEMORY_OFFHEAP_SIZE, 1000L)
+ .set(TEST_MEMORY, 1000L)
+ .set(MEMORY_OFFHEAP_ENABLED, true)
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 36dd620a56853..112fd31a060e6 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
@@ -76,7 +77,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with
LocalSparkContext {
test("throw exception on barrier() call timeout") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
@@ -101,7 +102,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with
LocalSparkContext {
test("throw exception if barrier() call doesn't happen on every task") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
@@ -124,7 +125,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with
LocalSparkContext {
test("throw exception if the number of barrier() calls are not the same on
every task") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 29bb8232f44f5..2215f7f366213 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
import org.apache.spark._
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
class BlacklistIntegrationSuite extends
SchedulerIntegrationSuite[MultiExecutorMockBackend]{
@@ -58,9 +59,9 @@ class BlacklistIntegrationSuite extends
SchedulerIntegrationSuite[MultiExecutorM
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
config.MAX_TASK_FAILURES.key -> "4",
- "spark.testing.nHosts" -> "2",
- "spark.testing.nExecutorsPerHost" -> "5",
- "spark.testing.nCoresPerExecutor" -> "10"
+ TEST_N_HOSTS.key -> "2",
+ TEST_N_EXECUTORS_HOST.key -> "5",
+ TEST_N_CORES_EXECUTOR.key -> "10"
)
) {
// To reliably reproduce the failure that would occur without
blacklisting, we have to use 1
@@ -102,9 +103,9 @@ class BlacklistIntegrationSuite extends
SchedulerIntegrationSuite[MultiExecutorM
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
- "spark.testing.nHosts" -> "2",
- "spark.testing.nExecutorsPerHost" -> "1",
- "spark.testing.nCoresPerExecutor" -> "1",
+ TEST_N_HOSTS.key -> "2",
+ TEST_N_EXECUTORS_HOST.key -> "1",
+ TEST_N_CORES_EXECUTOR.key -> "1",
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
)
) {
@@ -129,9 +130,9 @@ class MultiExecutorMockBackend(
conf: SparkConf,
taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler)
{
- val nHosts = conf.getInt("spark.testing.nHosts", 5)
- val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
- val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)
+ val nHosts = conf.get(TEST_N_HOSTS)
+ val nExecutorsPerHost = conf.get(TEST_N_EXECUTORS_HOST)
+ val nCoresPerExecutor = conf.get(TEST_N_CORES_EXECUTOR)
override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
(0 until nHosts).flatMap { hostIdx =>
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
index b9f0e873375b0..43621cb85762c 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory._
import org.apache.spark.unsafe.Platform
@@ -33,8 +34,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with
LocalSparkContext wi
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("ShuffleExternalSorterSuite")
- .set("spark.testing", "true")
- .set("spark.testing.memory", "1600")
+ .set(IS_TESTING, true)
+ .set(TEST_MEMORY, 1600L)
.set("spark.memory.fraction", "1")
sc = new SparkContext(conf)
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 19116cf22d2f8..480e07fb9399a 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -69,8 +70,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- conf.set("spark.testing.memory", maxMem.toString)
- conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+ conf.set(TEST_MEMORY, maxMem)
+ conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr,
"localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
@@ -87,7 +88,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set("spark.authenticate", "false")
conf.set(DRIVER_PORT, rpcEnv.address.port)
- conf.set("spark.testing", "true")
+ conf.set(IS_TESTING, true)
conf.set("spark.memory.fraction", "1")
conf.set("spark.memory.storageFraction", "1")
conf.set("spark.storage.unrollFraction", "0.4")
@@ -233,7 +234,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
val failableTransfer = mock(classOf[BlockTransferService]) // this wont
actually work
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
- conf.set("spark.testing.memory", "10000")
+ conf.set(TEST_MEMORY, 10000L)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val failableStore = new BlockManager("failable-store", rpcEnv, master,
serializerManager, conf,
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a7bb2a03360aa..bda81365b0792 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService,
TransportContext}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -89,8 +90,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
transferService: Option[BlockTransferService] = Option.empty,
testConf: Option[SparkConf] = None): BlockManager = {
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
- bmConf.set("spark.testing.memory", maxMem.toString)
- bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+ bmConf.set(TEST_MEMORY, maxMem)
+ bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
@@ -115,11 +116,10 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
System.setProperty("os.arch", "amd64")
conf = new SparkConf(false)
.set("spark.app.id", "test")
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "1")
.set("spark.kryoserializer.buffer", "1m")
- .set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
@@ -901,7 +901,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- conf.set("spark.testing.memory", "1200")
+ conf.set(TEST_MEMORY, 1200L)
val transfer = new NettyBlockTransferService(conf, securityMgr,
"localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf),
conf)
diff --git
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7274072e5049a..3d5dc01d9d7b6 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -39,7 +39,6 @@ class MemoryStoreSuite
with ResetSystemProperties {
var conf: SparkConf = new SparkConf(false)
- .set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 63f9f82adf3e0..8bc62db81e4f9 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
class DummyClass1 {}
@@ -76,7 +77,7 @@ class SizeEstimatorSuite
// Set the arch to 64-bit and compressedOops to true to get a
deterministic test-case
super.beforeEach()
System.setProperty("os.arch", "amd64")
- System.setProperty("spark.test.useCompressedOops", "true")
+ System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "true")
}
override def afterEach(): Unit = {
@@ -192,7 +193,7 @@ class SizeEstimatorSuite
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("64-bit arch with no compressed oops") {
System.setProperty("os.arch", "amd64")
- System.setProperty("spark.test.useCompressedOops", "false")
+ System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 35fba1a3b73c6..6211399005e1a 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark._
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.util.CompletionIterator
@@ -552,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val conf = createSparkConf(loadDefaults = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
- .set("spark.testing.memory", "100000000")
+ .set(TEST_MEMORY, 100000000L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e5.toInt
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 47173b89e91e2..aa400dd74e9ca 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
@@ -639,7 +640,7 @@ class ExternalSorterSuite extends SparkFunSuite with
LocalSparkContext {
val conf = createSparkConf(loadDefaults = false, kryo = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
- .set("spark.testing.memory", "100000000")
+ .set(TEST_MEMORY, 100000000L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e5.toInt
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index c0b435efb8c9c..cc89683949010 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
private[spark] class KubernetesTestComponents(defaultClient:
DefaultKubernetesClient) {
@@ -67,7 +68,7 @@ private[spark] class KubernetesTestComponents(defaultClient:
DefaultKubernetesCl
.set("spark.executors.instances", "1")
.set("spark.app.name", "spark-test-app")
.set("spark.ui.enabled", "true")
- .set("spark.testing", "false")
+ .set(IS_TESTING, false)
.set("spark.kubernetes.submission.waitAppCompletion", "false")
.set("spark.kubernetes.authenticate.driver.serviceAccountName",
serviceAccountName)
}
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index d0174516c2361..6dfc73dc91fae 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -298,7 +299,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
protected def driverURL: String = {
- if (conf.contains("spark.testing")) {
+ if (conf.contains(IS_TESTING)) {
"driverURL"
} else {
RpcEndpointAddress(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index dda7cb55f5395..5b38fe5c46bbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
-import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}
@@ -38,7 +38,7 @@ object SQLExecution {
executionIdToQueryExecution.get(executionId)
}
- private val testing = sys.props.contains("spark.testing")
+ private val testing = sys.props.contains(IS_TESTING.key)
private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
val sc = sparkSession.sparkContext
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
index d95794d624033..c37d663941d8d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter,
CodeGenerator}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
@@ -29,7 +30,7 @@ abstract class BenchmarkQueryTest extends QueryTest with
SharedSQLContext with B
// When Utils.isTesting is true, the RuleExecutor will issue an exception
when hitting
// the max iteration of analyzer/optimizer batches.
- assert(Utils.isTesting, "spark.testing is not set to true")
+ assert(Utils.isTesting, s"${IS_TESTING.key} is not set to true")
/**
* Drop all the tables
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index ca8692290edb2..963e42517b441 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
File}
import java.util.Properties
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
@@ -99,7 +100,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with
LocalSparkSession {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
- .set("spark.testing.memory", "80000")
+ .set(TEST_MEMORY, 80000L)
spark =
SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
val outputFile = File.createTempFile("test-unsafe-row-serializer-spill",
"")
outputFile.deleteOnExit()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]