gengliangwang commented on a change in pull request #35982:
URL: https://github.com/apache/spark/pull/35982#discussion_r837645169



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
##########
@@ -15,139 +15,385 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.util
-
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
UnresolvedCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
-/**
- * This object contains fields to help process DEFAULT columns.
- */
-object ResolveDefaultColumns {
-  // This column metadata indicates the default value associated with a 
particular table column that
-  // is in effect at any given time. Its value begins at the time of the 
initial CREATE/REPLACE
-  // TABLE statement with DEFAULT column definition(s), if any. It then 
changes whenever an ALTER
-  // TABLE statement SETs the DEFAULT. The intent is for this "current 
default" to be used by
-  // UPDATE, INSERT and MERGE, which evaluate each default expression for each 
row.
-  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
-  // This column metadata represents the default value for all existing rows 
in a table after a
-  // column has been added. This value is determined at time of CREATE TABLE, 
REPLACE TABLE, or
-  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for 
this "exist default"
-  // to be used by any scan when the columns in the source row are missing 
data. For example,
-  // consider the following sequence:
-  // CREATE TABLE t (c1 INT)
-  // INSERT INTO t VALUES (42)
-  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
-  // SELECT c1, c2 FROM t
-  // In this case, the final query is expected to return 42, 43. The ALTER 
TABLE ADD COLUMNS command
-  // executed after there was already data in the table, so in order to 
enforce this invariant,
-  // we need either (1) an expensive backfill of value 43 at column c2 into 
all previous rows, or
-  // (2) indicate to each data source that selected columns missing data are 
to generate the
-  // corresponding DEFAULT value instead. We choose option (2) for efficiency, 
and represent this
-  // value as the text representation of a folded constant in the 
"EXISTS_DEFAULT" column metadata.
-  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
-
+package org.apache.spark.sql.catalyst.analysis {
   /**
-   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and 
constant-folds them.
-   *
-   * The results are stored in the "exists default" metadata of the same 
columns. For example, in
-   * the event of this statement:
+   * This is a rule to process DEFAULT columns in statements such as 
CREATE/REPLACE TABLE.
    *
-   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+   * Background: CREATE TABLE and ALTER TABLE invocations support setting 
column default values for
+   * later operations. Following INSERT, and INSERT MERGE commands may then 
reference the value
+   * using the DEFAULT keyword as needed.
    *
-   * This method constant-folds the "current default" value, stored in the 
CURRENT_DEFAULT metadata
-   * of the "b" column, to "10", storing the result in the "exists default" 
value within the
-   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current 
default" metadata of this
-   * "b" column retains its original value of "5 + 5".
+   * Example:
+   * CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
+   * INSERT INTO T VALUES (1, 2);
+   * INSERT INTO T VALUES (1, DEFAULT);
+   * INSERT INTO T VALUES (DEFAULT, 6);
+   * SELECT * FROM T;
+   * (1, 2)
+   * (1, 5)
+   * (4, 6)
    *
-   * The reason for constant-folding the EXISTS_DEFAULT is to make the 
end-user visible behavior the
-   * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT 
value, as if the system
-   * had performed an exhaustive backfill of the provided value to all 
previously existing rows in
-   * the table instead. We choose to avoid doing such a backfill because it 
would be a
-   * time-consuming and costly operation. Instead, we elect to store the 
EXISTS_DEFAULT in the
-   * column metadata for future reference when querying data out of the data 
source. In turn, each
-   * data source then takes responsibility to provide the constant-folded 
value in the
-   * EXISTS_DEFAULT metadata for such columns where the value is not present 
in storage.
-   *
-   * @param analyzer used for analyzing the result of parsing the column 
expression stored as text.
-   * @param tableSchema represents the names and types of the columns of the 
statement to process.
-   * @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
-   * @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
+   * @param analyzer analyzer to use for processing DEFAULT values stored as 
text.
+   * @param catalog  the catalog to use for looking up the schema of INSERT 
INTO table objects.
+   * @param insert   the enclosing INSERT statement for which this rule is 
processing the query, if
+   *                 any.
    */
-  def constantFoldCurrentDefaultsToExistDefaults(
+  case class ResolveDefaultColumns(
       analyzer: Analyzer,
-      tableSchema: StructType,
-      statementType: String): StructType = {
-    if (!SQLConf.get.enableDefaultColumns) {
-      return tableSchema
+      catalog: SessionCatalog,
+      insert: Option[InsertIntoStatement] = None) extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+      (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+      case i@InsertIntoStatement(_, _, _, _, _, _)
+        if i.query.collectFirst { case u: UnresolvedInlineTable => u 
}.isDefined =>
+        // Create a helper instance of this same rule with the `insert` 
argument populated as `i`.
+        // Then recursively apply it on the `query` of `i` to transform the 
result. This recursive
+        // application lets the below case matching against 
`UnresolvedInlineTable` to trigger
+        // anywhere that operator may appear in the descendants of `i.query`.
+        val helper = ResolveDefaultColumns(analyzer, catalog, Some(i))

Review comment:
       I am thinking about setting a TreeNodeTag with `i` so that we don't need 
the recursion here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to