Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/17521#discussion_r109486681
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala ---
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.util.{NoSuchElementException, Properties, TimeZone}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{ConfigEntry, ConfigReader,
OptionalConfigEntry}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.util.Utils
+
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// This file defines the configuration options for Catalyst
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+object CatalystConf {
+ import ConfEntries.{buildConf, buildStaticConf}
+
+ val OPTIMIZER_MAX_ITERATIONS =
buildConf("spark.sql.optimizer.maxIterations")
+ .internal()
+ .doc("The max number of iterations the optimizer and analyzer runs.")
+ .intConf
+ .createWithDefault(100)
+
+ val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
+ buildConf("spark.sql.optimizer.inSetConversionThreshold")
+ .internal()
+ .doc("The threshold of set size for InSet conversion.")
+ .intConf
+ .createWithDefault(10)
+
+ val MAX_CASES_BRANCHES = buildConf("spark.sql.codegen.maxCaseBranches")
+ .internal()
+ .doc("The maximum number of switches supported with codegen.")
+ .intConf
+ .createWithDefault(20)
+
+ val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal")
+ .doc("When true, the ordinal numbers are treated as the position in
the select list. " +
+ "When false, the ordinal numbers in order/sort by clause are
ignored.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val GROUP_BY_ORDINAL = buildConf("spark.sql.groupByOrdinal")
+ .doc("When true, the ordinal numbers in group by clauses are treated
as the position " +
+ "in the select list. When false, the ordinal numbers are ignored.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive")
+ .internal()
+ .doc("Whether the query analyzer should be case sensitive or not. " +
+ "Default to case insensitive. It is highly discouraged to turn on
case sensitive mode.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val RUN_SQL_ON_FILES = buildConf("spark.sql.runSQLOnFiles")
+ .internal()
+ .doc("When true, we could use `datasource`.`path` as table in SQL
query.")
+ .booleanConf
+ .createWithDefault(true)
+
+ // Use lazy val here, as the default timezone may change after
`CatalystConf` is instantiated.
+ // TODO: we may need a `ConfigEntryWithDefaultFunction` so that the
default value of session
+ // location timezone is always the default timezone at the time we call
`getConf`.
+ lazy val SESSION_LOCAL_TIMEZONE =
+ buildConf("spark.sql.session.timeZone")
+ .doc("""The ID of session local timezone, e.g. "GMT",
"America/Los_Angeles", etc.""")
+ .stringConf
+ .createWithDefault(TimeZone.getDefault().getID())
+
+ val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
+ .doc("When false, we will throw an error if a query contains a
cartesian product without " +
+ "explicit CROSS JOIN syntax.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CBO_ENABLED =
+ buildConf("spark.sql.cbo.enabled")
+ .doc("Enables CBO for estimation of plan statistics when set true.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val JOIN_REORDER_ENABLED =
+ buildConf("spark.sql.cbo.joinReorder.enabled")
+ .doc("Enables join reorder in CBO.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val JOIN_REORDER_DP_THRESHOLD =
+ buildConf("spark.sql.cbo.joinReorder.dp.threshold")
+ .doc("The maximum number of joined nodes allowed in the dynamic
programming algorithm.")
+ .intConf
+ .checkValue(number => number > 0, "The maximum number must be a
positive integer.")
+ .createWithDefault(12)
+
+ val JOIN_REORDER_CARD_WEIGHT =
+ buildConf("spark.sql.cbo.joinReorder.card.weight")
+ .internal()
+ .doc("The weight of cardinality (number of rows) for plan cost
comparison in join reorder: " +
+ "rows * weight + size * (1 - weight).")
+ .doubleConf
+ .checkValue(weight => weight >= 0 && weight <= 1, "The weight value
must be in [0, 1].")
+ .createWithDefault(0.7)
+
+ val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
+ .doc("When true, it enables join reordering based on star schema
detection. ")
+ .booleanConf
+ .createWithDefault(false)
+
+ val STARSCHEMA_FACT_TABLE_RATIO =
buildConf("spark.sql.cbo.starJoinFTRatio")
+ .internal()
+ .doc("Specifies the upper limit of the ratio between the largest fact
tables" +
+ " for a star join to be considered. ")
+ .doubleConf
+ .createWithDefault(0.9)
+
+ val MAX_NESTED_VIEW_DEPTH =
+ buildConf("spark.sql.view.maxNestedViewDepth")
+ .internal()
+ .doc("The maximum depth of a view reference in a nested view. A
nested view may reference " +
+ "other nested views, the dependencies are organized in a directed
acyclic graph (DAG). " +
+ "However the DAG depth may become too large and cause unexpected
behavior. This " +
+ "configuration puts a limit on this: when the depth of a view
exceeds this value during " +
+ "analysis, we terminate the resolution to avoid potential errors.")
+ .intConf
+ .checkValue(depth => depth > 0, "The maximum depth of a view
reference in a nested view " +
+ "must be positive.")
+ .createWithDefault(100)
+
+ val CONSTRAINT_PROPAGATION_ENABLED =
buildConf("spark.sql.constraintPropagation.enabled")
+ .internal()
+ .doc("When true, the query optimizer will infer and propagate data
constraints in the query " +
+ "plan to optimize them. Constraint propagation can sometimes be
computationally expensive" +
+ "for certain kinds of query plans (such as those with a large number
of predicates and " +
+ "aliases) which might negatively impact overall runtime.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val NDV_MAX_ERROR =
+ buildConf("spark.sql.statistics.ndv.maxError")
+ .internal()
+ .doc("The maximum estimation error allowed in HyperLogLog++
algorithm when generating " +
+ "column level statistics.")
+ .doubleConf
+ .createWithDefault(0.05)
+
+ object STATIC {
+ val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
+ buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
+ .internal()
+ .doc("The maximum size of the cache that maps qualified table
names to table relation " +
+ "plans.")
+ .intConf
+ .checkValue(cacheSize => cacheSize >= 0, "The cache size can not
be negative")
+ .createWithDefault(1000)
+
+ val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir")
+ .doc("The default location for managed databases and tables.")
+ .stringConf
+ .createWithDefault(Utils.resolveURI("spark-warehouse").toString)
+ }
+}
+
+/**
+ * A class that enables the setting and getting of mutable config
parameters/hints.
+ *
+ * CatalystConf is thread-safe (internally synchronized, so safe to be
used in multiple threads).
+ */
+class CatalystConf extends Serializable with Logging {
+ /** ********************** configuration functionality methods
************ */
+ import ConfEntries._
+
+ /** Only low degree of contention is expected for conf, thus NOT using
ConcurrentHashMap. */
+ @transient private[spark] val settings =
java.util.Collections.synchronizedMap(
+ new java.util.HashMap[String, String]())
+
+ @transient private val reader = new ConfigReader(settings)
+
+ /** Set Spark SQL configuration properties. */
+ def setConf(props: Properties): Unit = settings.synchronized {
+ props.asScala.foreach { case (k, v) => setConfString(k, v) }
+ }
+
+ /** Set the given Spark SQL configuration property using a `string`
value. */
+ def setConfString(key: String, value: String): Unit = {
+ require(key != null, "key cannot be null")
+ require(value != null, s"value cannot be null for key: $key")
+ val entry = normalConfEntries.get(key)
+ if (entry != null) {
+ // Only verify for built-in configurations.
+ entry.valueConverter(value)
+ }
+ setConfInternal(key, value)
+ }
+
+ /** Set the given Spark SQL configuration property. */
+ def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
+ require(entry != null, "entry cannot be null")
+ require(value != null, s"value cannot be null for key: ${entry.key}")
+ require(normalConfEntries.get(entry.key) == entry, s"$entry is not
registered")
+ setConfInternal(entry.key, entry.stringConverter(value))
+ }
+
+ /** Return the value of Spark SQL configuration property for the given
key. */
+ @throws[NoSuchElementException]("if key is not set")
+ def getConfString(key: String): String = {
+ Option(settings.get(key)).orElse {
+ // Try to use the default value
+ Option(normalConfEntries.get(key)).map(_.defaultValueString)
+ }.getOrElse(throw new NoSuchElementException(key))
+ }
+
+ /**
+ * Return the value of Spark SQL configuration property for the given
key. If the key is not set
+ * yet, return `defaultValue`. This is useful when `defaultValue` in
ConfigEntry is not the
+ * desired one.
+ */
+ def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
+ require(normalConfEntries.get(entry.key) == entry, s"$entry is not
registered")
+
Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
+ }
+
+ /**
+ * Return the value of Spark SQL configuration property for the given
key. If the key is not set
+ * yet, return `defaultValue` in [[ConfigEntry]].
+ */
+ def getConf[T](entry: ConfigEntry[T]): T = {
+ require(normalConfEntries.get(entry.key) == entry, s"$entry is not
registered")
+ entry.readFrom(reader)
+ }
+
+ /**
+ * Return the value of an optional Spark SQL configuration property for
the given key. If the key
+ * is not set yet, returns None.
+ */
+ def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
+ require(normalConfEntries.get(entry.key) == entry, s"$entry is not
registered")
+ entry.readFrom(reader)
+ }
+
+ /**
+ * Return the `string` value of Spark SQL configuration property for the
given key. If the key is
+ * not set yet, return `defaultValue`.
+ */
+ def getConfString(key: String, defaultValue: String): String = {
+ val entry = normalConfEntries.get(key)
+ if (entry != null && defaultValue != "<undefined>") {
+ // Only verify for built-in configurations.
+ entry.valueConverter(defaultValue)
+ }
+ Option(settings.get(key)).getOrElse(defaultValue)
+ }
+
+ /**
+ * Return all the configuration properties that have been set (i.e. not
the default).
+ * This creates a new copy of the config properties in the form of a Map.
+ */
+ def getAllConfs: Map[String, String] =
+ settings.synchronized { settings.asScala.toMap }
+
+ /**
+ * Return all the built-in configuration definitions. Each definition
contains key, defaultValue
+ * and doc.
+ */
+ def getAllDefinedConfs: Seq[(String, String, String)] =
normalConfEntries.synchronized {
+ normalConfEntries.values.asScala.filter(_.isPublic).map { entry =>
+ (entry.key, getConfString(entry.key, entry.defaultValueString),
entry.doc)
+ }.toSeq
+ }
+
+ /**
+ * Return whether a given key is set in this [[CatalystConf]].
+ */
+ def contains(key: String): Boolean = {
+ settings.containsKey(key)
+ }
+
+ private def setConfInternal(key: String, value: String): Unit = {
+ settings.put(key, value)
+ }
+
+ def unsetConf(key: String): Unit = {
+ settings.remove(key)
+ }
+
+ def unsetConf(entry: ConfigEntry[_]): Unit = {
+ settings.remove(entry.key)
+ }
+
+ def clear(): Unit = {
+ settings.clear()
+ }
+
+ override def clone(): CatalystConf = {
+ val result = new CatalystConf
+ getAllConfs.foreach {
+ case(k, v) => if (v ne null) result.setConfString(k, v)
+ }
+ result
+ }
+
+ // For test only
+ private[spark] def copy[T](entry: ConfigEntry[T], value: T):
CatalystConf = {
--- End diff --
Most of the content in this file is copied from `SQLConf`, except the `lazy
val` for session local timezone, and the 3 `copy` methods here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]