Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/6747#discussion_r32287446
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---
@@ -25,74 +25,281 @@ import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
- val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
- val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
- val IN_MEMORY_PARTITION_PRUNING =
"spark.sql.inMemoryColumnarStorage.partitionPruning"
- val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
- val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
- val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
- val CODEGEN_ENABLED = "spark.sql.codegen"
- val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
- val DIALECT = "spark.sql.dialect"
- val CASE_SENSITIVE = "spark.sql.caseSensitive"
-
- val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
- val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
- val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
- val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
- val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
- val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
-
- val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
-
- val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
-
- val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
- val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
+
+ private val sqlConfEntries = java.util.Collections.synchronizedMap(
+ new java.util.HashMap[String, SQLConfEntry[_]]())
+
+ private[sql] class SQLConfEntry[T] private(
+ val key: String,
+ val defaultValue: Option[T],
+ val valueConverter: String => T,
+ val stringConverter: T => String,
+ val doc: String,
+ val isPublic: Boolean) {
+
+ def defaultValueString: String =
defaultValue.map(stringConverter).getOrElse("<undefined>")
+
+ override def toString: String = {
+ // Fail tests that use `SQLConfEntry` as a string.
+ throw new IllegalStateException("Force to fail tests")
+ // s"SQLConfEntry(key = $key, defaultValue=$defaultValueString,
doc=$doc,
+ // isPublic = $isPublic)"
+ }
+ }
+
+ private[sql] object SQLConfEntry {
+
+ private def apply[T](
+ key: String,
+ defaultValue: Option[T],
+ valueConverter: String => T,
+ stringConverter: T => String,
+ doc: String,
+ isPublic: Boolean): SQLConfEntry[T] =
+ sqlConfEntries.synchronized {
+ if (sqlConfEntries.containsKey(key)) {
+ throw new IllegalArgumentException(s"Duplicate SQLConfEntry.
$key has been registered")
+ }
+ val entry =
+ new SQLConfEntry[T](key, defaultValue, valueConverter,
stringConverter, doc, isPublic)
+ sqlConfEntries.put(key, entry)
+ entry
+ }
+
+ def intConf(
+ key: String,
+ defaultValue: Option[Int] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Int] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be int, but
was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def longConf(
+ key: String,
+ defaultValue: Option[Long] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Long] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toLong
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be long, but
was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def doubleConf(
+ key: String,
+ defaultValue: Option[Double] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Double] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toDouble
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be double,
but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def booleanConf(
+ key: String,
+ defaultValue: Option[Boolean] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Boolean] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(s"$key should be boolean,
but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def stringConf(
+ key: String,
+ defaultValue: Option[String] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[String] =
+ SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
+ }
+
+ import SQLConfEntry._
+
+ val COMPRESS_CACHED =
booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
+ defaultValue = Some(true),
+ doc = "When set to true Spark SQL will automatically select a
compression codec for each " +
+ "column based on statistics of the data.")
+
+ val COLUMN_BATCH_SIZE =
intConf("spark.sql.inMemoryColumnarStorage.batchSize",
+ defaultValue = Some(10000),
+ doc = "Controls the size of batches for columnar caching. Larger
batch sizes can improve " +
+ "memory utilization and compression, but risk OOMs when caching
data.")
+
+ val IN_MEMORY_PARTITION_PRUNING =
+ booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
+ defaultValue = Some(false),
+ doc = "<TODO>")
+
+ val AUTO_BROADCASTJOIN_THRESHOLD =
intConf("spark.sql.autoBroadcastJoinThreshold",
+ defaultValue = Some(10 * 1024 * 1024),
+ doc = "Configures the maximum size in bytes for a table that will be
broadcast to all worker " +
+ "nodes when performing a join. By setting this value to -1
broadcasting can be disabled. " +
+ "Note that currently statistics are only supported for Hive
Metastore tables where the " +
+ "command<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS
noscan</code> has been run.")
+
+ val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes",
isPublic = false)
+
+ val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
+ defaultValue = Some(200),
+ doc = "Configures the number of partitions to use when shuffling data
for joins or " +
+ "aggregations.")
+
+ val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+ defaultValue = Some(false),
+ doc = "When true, code will be dynamically generated at runtime for
expression evaluation in" +
+ " a specific query. For some queries with complicated expression
this option can lead to " +
+ "significant speed-ups. However, for simple queries this can
actually slow down query " +
+ "execution.")
+
+ val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
+ defaultValue = Some(false),
+ doc = "<TDDO>")
+
+ val DIALECT = stringConf("spark.sql.dialect", defaultValue =
Some("sql"), doc = "<TODO>")
+
+ val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
+ defaultValue = Some(true),
+ doc = "<TODO>")
+
+ val PARQUET_BINARY_AS_STRING =
booleanConf("spark.sql.parquet.binaryAsString",
+ defaultValue = Some(false),
+ doc = "Some other Parquet-producing systems, in particular Impala and
older versions of " +
+ "Spark SQL, do not differentiate between binary data and strings
when writing out the " +
+ "Parquet schema. This flag tells Spark SQL to interpret binary data
as a string to provide " +
+ "compatibility with these systems.")
+
+ val PARQUET_INT96_AS_TIMESTAMP =
booleanConf("spark.sql.parquet.int96AsTimestamp",
+ defaultValue = Some(true),
+ doc = "Some Parquet-producing systems, in particular Impala, store
Timestamp into INT96. " +
+ "Spark would also store Timestamp as INT96 because we need to avoid
precision lost of the " +
+ "nanoseconds field. This flag tells Spark SQL to interpret INT96
data as a timestamp to " +
+ "provide compatibility with these systems.")
+
+ val PARQUET_CACHE_METADATA =
booleanConf("spark.sql.parquet.cacheMetadata",
+ defaultValue = Some(true),
+ doc = "Turns on caching of Parquet schema metadata. Can speed up
querying of static data.")
+
+ val PARQUET_COMPRESSION =
stringConf("spark.sql.parquet.compression.codec",
--- End diff --
Could we also have an entry type for configs like these that validates the
option is in some list?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]