gengliangwang commented on a change in pull request #35982:
URL: https://github.com/apache/spark/pull/35982#discussion_r837645169
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
##########
@@ -15,139 +15,385 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.util
-
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-/**
- * This object contains fields to help process DEFAULT columns.
- */
-object ResolveDefaultColumns {
- // This column metadata indicates the default value associated with a
particular table column that
- // is in effect at any given time. Its value begins at the time of the
initial CREATE/REPLACE
- // TABLE statement with DEFAULT column definition(s), if any. It then
changes whenever an ALTER
- // TABLE statement SETs the DEFAULT. The intent is for this "current
default" to be used by
- // UPDATE, INSERT and MERGE, which evaluate each default expression for each
row.
- val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
- // This column metadata represents the default value for all existing rows
in a table after a
- // column has been added. This value is determined at time of CREATE TABLE,
REPLACE TABLE, or
- // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for
this "exist default"
- // to be used by any scan when the columns in the source row are missing
data. For example,
- // consider the following sequence:
- // CREATE TABLE t (c1 INT)
- // INSERT INTO t VALUES (42)
- // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
- // SELECT c1, c2 FROM t
- // In this case, the final query is expected to return 42, 43. The ALTER
TABLE ADD COLUMNS command
- // executed after there was already data in the table, so in order to
enforce this invariant,
- // we need either (1) an expensive backfill of value 43 at column c2 into
all previous rows, or
- // (2) indicate to each data source that selected columns missing data are
to generate the
- // corresponding DEFAULT value instead. We choose option (2) for efficiency,
and represent this
- // value as the text representation of a folded constant in the
"EXISTS_DEFAULT" column metadata.
- val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
-
+package org.apache.spark.sql.catalyst.analysis {
/**
- * Finds "current default" expressions in CREATE/REPLACE TABLE columns and
constant-folds them.
- *
- * The results are stored in the "exists default" metadata of the same
columns. For example, in
- * the event of this statement:
+ * This is a rule to process DEFAULT columns in statements such as
CREATE/REPLACE TABLE.
*
- * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+ * Background: CREATE TABLE and ALTER TABLE invocations support setting
column default values for
+ * later operations. Following INSERT, and INSERT MERGE commands may then
reference the value
+ * using the DEFAULT keyword as needed.
*
- * This method constant-folds the "current default" value, stored in the
CURRENT_DEFAULT metadata
- * of the "b" column, to "10", storing the result in the "exists default"
value within the
- * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current
default" metadata of this
- * "b" column retains its original value of "5 + 5".
+ * Example:
+ * CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
+ * INSERT INTO T VALUES (1, 2);
+ * INSERT INTO T VALUES (1, DEFAULT);
+ * INSERT INTO T VALUES (DEFAULT, 6);
+ * SELECT * FROM T;
+ * (1, 2)
+ * (1, 5)
+ * (4, 6)
*
- * The reason for constant-folding the EXISTS_DEFAULT is to make the
end-user visible behavior the
- * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT
value, as if the system
- * had performed an exhaustive backfill of the provided value to all
previously existing rows in
- * the table instead. We choose to avoid doing such a backfill because it
would be a
- * time-consuming and costly operation. Instead, we elect to store the
EXISTS_DEFAULT in the
- * column metadata for future reference when querying data out of the data
source. In turn, each
- * data source then takes responsibility to provide the constant-folded
value in the
- * EXISTS_DEFAULT metadata for such columns where the value is not present
in storage.
- *
- * @param analyzer used for analyzing the result of parsing the column
expression stored as text.
- * @param tableSchema represents the names and types of the columns of the
statement to process.
- * @param statementType name of the statement being processed, such as
INSERT; useful for errors.
- * @return a copy of `tableSchema` with field metadata updated with the
constant-folded values.
+ * @param analyzer analyzer to use for processing DEFAULT values stored as
text.
+ * @param catalog the catalog to use for looking up the schema of INSERT
INTO table objects.
+ * @param insert the enclosing INSERT statement for which this rule is
processing the query, if
+ * any.
*/
- def constantFoldCurrentDefaultsToExistDefaults(
+ case class ResolveDefaultColumns(
analyzer: Analyzer,
- tableSchema: StructType,
- statementType: String): StructType = {
- if (!SQLConf.get.enableDefaultColumns) {
- return tableSchema
+ catalog: SessionCatalog,
+ insert: Option[InsertIntoStatement] = None) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
+ (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+ case i@InsertIntoStatement(_, _, _, _, _, _)
+ if i.query.collectFirst { case u: UnresolvedInlineTable => u
}.isDefined =>
+ // Create a helper instance of this same rule with the `insert`
argument populated as `i`.
+ // Then recursively apply it on the `query` of `i` to transform the
result. This recursive
+ // application lets the below case matching against
`UnresolvedInlineTable` to trigger
+ // anywhere that operator may appear in the descendants of `i.query`.
+ val helper = ResolveDefaultColumns(analyzer, catalog, Some(i))
Review comment:
I am thinking about setting a TreeNodeTag with `i` so that we don't need
the recursion here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]