Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/18555#discussion_r129946652
--- Diff: core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---
@@ -322,6 +324,291 @@ class SparkConfSuite extends SparkFunSuite with
LocalSparkContext with ResetSyst
conf.validateSettings()
}
+ test("verify spark.blockManager.port configuration") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("My app")
+
+ conf.validateSettings()
+ assert(!conf.contains(BLOCK_MANAGER_PORT.key))
+
+ Seq(
+ "0", // normal values
+ "1024", // min values
+ "65535" // max values
+ ).foreach { value =>
+ conf.set(BLOCK_MANAGER_PORT.key, value)
+ var sc0 = new SparkContext(conf)
+ assert(sc0.isStopped === false)
+ assert(sc0.conf.get(BLOCK_MANAGER_PORT) === value.toInt)
+ sc0.stop()
+ conf.remove(BLOCK_MANAGER_PORT)
+ }
+
+ // Verify abnormal values
+ Seq(
+ "-1",
+ "1000",
+ "65536"
+ ).foreach { value =>
+ conf.set(BLOCK_MANAGER_PORT.key, value)
+ val excMsg = intercept[IllegalArgumentException] {
+ new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // blockManager port should be between 1024 and 65535 (inclusive),
+ // or 0 for a random free port.
+ assert(excMsg.contains("blockManager port should be between 1024 " +
+ "and 65535 (inclusive), or 0 for a random free port."))
+
+ conf.remove(BLOCK_MANAGER_PORT)
+ }
+ }
+
+ test("verify spark.executor.memory configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("executor memory")
+ .set(EXECUTOR_MEMORY.key, "-1")
+ val excMsg = intercept[NumberFormatException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.NumberFormatException:
+ // Size must be specified as bytes (b), kibibytes (k),
+ // mebibytes (m), gibibytes (g), tebibytes (t),
+ // or pebibytes(p). E.g. 50b, 100k, or 250m.
+ assert(excMsg.contains("Size must be specified as bytes (b), kibibytes
(k), " +
+ "mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). E.g.
50b, 100k, or 250m."))
+ }
+
+ test("verify spark.task.cpus configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("cpus")
+ .set(CPUS_PER_TASK.key, "-1")
+ val excMsg = intercept[IllegalArgumentException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // Number of cores to allocate for task event queue must be positive.
+ assert(excMsg.contains("Number of cores to allocate for task event
queue must be positive."))
+ }
+
+ test("verify spark.task.maxFailures configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("task maxFailures")
+ .set(MAX_TASK_FAILURES.key, "-1")
+ val sc0 = new SparkContext(conf)
+ val excMsg = intercept[IllegalArgumentException] {
+ new TaskSchedulerImpl(sc0)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The retry times of task should be greater than or equal to 1.
+ assert(excMsg.contains("The retry times of task should be greater than
or equal to 1."))
+ sc0.stop()
+ }
+
+ test("verify listenerbus.eventqueue.capacity configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("capacity")
+ .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY.key, "-1")
+ val excMsg = intercept[IllegalArgumentException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The capacity of listener bus event queue must be positive.
+ assert(excMsg.contains("The capacity of listener bus event queue must
be positive."))
+ }
+
+ test("verify metrics.maxListenerClassesTimed configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("listenerbus")
+ .set(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED.key, "-1")
+ val excMsg = intercept[IllegalArgumentException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The maxListenerClassesTimed of listener bus event queue must be
positive.
+ assert(excMsg.contains("The maxListenerClassesTimed of listener bus " +
+ "event queue must be positive."))
+ }
+
+ test("verify spark.ui.retained configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("retained")
+ .set(UI_RETAINED_TASKS.key, "-1")
+ val excMsg = intercept[IllegalArgumentException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // Number of Tasks event queue must be positive.
+ assert(excMsg.contains("Number of Tasks event queue must be
positive."))
+ }
+
+ test("verify spark.files.maxPartitionBytes configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("maxPartitionBytes")
+ .set(FILES_MAX_PARTITION_BYTES.key, "-1")
+ sc = new SparkContext(conf)
+ val tempDir = Utils.createTempDir()
+ val binaryRDD = sc.binaryFiles(tempDir.getAbsolutePath)
+ val excMsg = intercept[IllegalArgumentException] {
+ // get RDD partitions is error
+ binaryRDD.partitions
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The maximum number of bytes event queue must be positive.
+ assert(excMsg.contains("The maximum number of bytes event queue must
be positive."))
+ }
+
+ test("verify spark.files.openCostInBytes configuration exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("openCostInBytes")
+ .set(FILES_OPEN_COST_IN_BYTES.key, "-1")
+ sc = new SparkContext(conf)
+ val tempDir = Utils.createTempDir()
+ val binaryRDD = sc.binaryFiles(tempDir.getAbsolutePath)
+ val excMsg = intercept[IllegalArgumentException] {
+ // get RDD partitions is error
+ binaryRDD.partitions
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The number of CostInBytes event queue must be positive.
+ assert(excMsg.contains("The number of CostInBytes event queue must be
positive."))
+ }
+
+ test("verify spark.shuffle.accurateBlockThreshold configuration
exception") {
+ val conf = new SparkConf(false)
+ .set("spark.serializer", classOf[KryoSerializer].getName)
+ .setMaster("local").setAppName("accurateBlockThreshold")
+ .set(SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "-1")
+ sc = new SparkContext(conf)
+ val rdd = sc.parallelize(0 until 3000, 10).repartition(2001)
+ val excMsg = intercept[SparkException] {
+ rdd.collect().length
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The number of BlockThreshold event queue must be positive.
+ assert(excMsg.contains("The number of BlockThreshold event queue must
be positive."))
+ }
+
+ test("verify spark.shuffle.registration.timeout configuration
exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("timeout")
+ .set(SHUFFLE_SERVICE_ENABLED, true)
+ .set(SHUFFLE_REGISTRATION_TIMEOUT.key, "-1")
+ val excMsg = intercept[IllegalArgumentException] {
+ sc = new SparkContext(conf)
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // Timeout in milliseconds for registration event queue must be
positive.
+ assert(excMsg.contains("Timeout in milliseconds for registration " +
+ "event queue must be positive."))
+ conf.remove(SHUFFLE_SERVICE_ENABLED)
+ }
+
+ test("verify spark.reducer.maxReqSizeShuffleToMem configuration
exception") {
+ val conf = new SparkConf(false)
+ .setMaster("local").setAppName("maxReqSizeShuffleToMem")
+ .set(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM.key, "-1")
+ sc = new SparkContext(conf)
+ val a = sc.parallelize(1 to 5, 201)
+ val b = a.map(x => (x, x*2))
+ val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(201))
+ .setSerializer(new KryoSerializer(conf))
+ val excMsg = intercept[SparkException] {
+ c.collect()
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // Size of the request is above this threshold event queue must be
positive.
+ assert(excMsg.contains("Size of the request is above this threshold " +
+ "event queue must be positive."))
+ }
+
+ test("verify shuffle file buffer size configuration") {
+ val conf = new SparkConf(false)
+
+ // 2097151: Int.MaxValue / 1024
+ val maxvalue = 2097151L
+
+ Seq(
+ SHUFFLE_FILE_BUFFER_SIZE,
+ SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE
+ ).foreach { config =>
+ assert(!conf.contains(config))
+
+ // min values
+ conf.set(config.key, "1k")
+ assert(conf.get(config) === 1)
+
+ // max values
+ conf.set(config.key, s"${maxvalue.toString}k")
+ assert(conf.get(config) === maxvalue)
+
+ // Verify exception values
+ Seq(
+ "0k",
+ "-1k",
+ s"${(maxvalue + 1).toString}k"
+ ).foreach { value =>
+ conf.set(config.key, value)
+ var sc0 = new SparkContext("local", "test", conf)
+ val a = sc0.parallelize(1 to 5, 201)
+ val b = a.map(x => (x, x*2))
+ val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(201))
+ .setSerializer(new KryoSerializer(conf))
+
+ val excMsg = intercept[SparkException] {
+ c.collect()
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The file buffer size must be greater than 0 and less than
${Int.MaxValue / 1024}.
+ // or The buffer size must be greater than 0 and less than
${Int.MaxValue / 1024}.
+ assert(excMsg.contains("buffer size must be greater than 0 " +
+ s"and less than ${Int.MaxValue / 1024}."))
+ sc0.stop()
+ conf.remove(config)
+ }
+ }
+ }
+
+ test("verify spark.shuffle.spill.diskWriteBufferSize configuration") {
+ val conf = new SparkConf(false)
+
+ // 2147483647: Int.MaxValue
+ val maxvalue = 2147483647L
+
+ assert(!conf.contains(SHUFFLE_DISK_WRITE_BUFFER_SIZE))
+
+ // min values
+ conf.set(SHUFFLE_DISK_WRITE_BUFFER_SIZE.key, "1024B")
+ assert(conf.get(SHUFFLE_DISK_WRITE_BUFFER_SIZE) === 1024)
+
+ // max values
+ conf.set(SHUFFLE_DISK_WRITE_BUFFER_SIZE.key, s"${maxvalue.toString}B")
+ assert(conf.get(SHUFFLE_DISK_WRITE_BUFFER_SIZE) === maxvalue)
+
+ // Verify exception values
+ Seq(
+ "0B",
+ "-1B",
+ s"${(maxvalue + 1).toString}B"
+ ).foreach { value =>
+ conf.set(SHUFFLE_DISK_WRITE_BUFFER_SIZE.key, value)
+ var sc0 = new SparkContext("local", "test", conf)
+ val a = sc0.parallelize(1 to 5, 201)
+ val b = a.map(x => (x, x*2))
+ val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(201))
+ .setSerializer(new KryoSerializer(conf))
+
+ val excMsg = intercept[SparkException] {
+ c.collect()
+ }.getMessage
+ // Caused by: java.lang.IllegalArgumentException:
+ // The buffer size must be greater than 0 and less than
${Int.MaxValue}.
+ assert(excMsg.contains("The buffer size must be greater than 0 " +
+ s"and less than ${Int.MaxValue}."))
+ sc0.stop()
+ conf.remove(SHUFFLE_DISK_WRITE_BUFFER_SIZE)
+ }
+ }
--- End diff --
Thanks for adding these test cases! Could you do me a favor and run each
test case in a separate branch without any extra change? and post what are the
current behaviors?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]