dtenedor commented on code in PR #38823:
URL: https://github.com/apache/spark/pull/38823#discussion_r1116029029


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns

Review Comment:
   please expand this to provide more background on what generated columns are, 
the SQL syntax to specify them, and links to other files or places where the 
reader can learn more?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -571,6 +571,11 @@
     ],
     "sqlState" : "42809"
   },
+  "GENERATED_COLUMN_WITH_DEFAULT_VALUE" : {

Review Comment:
   Nice, thanks for covering the intersection of these two cases!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself
+    val relation = new LocalRelation(StructType(schema.filterNot(_.name == 
fieldName)).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {

Review Comment:
   you can pattern-match on the error class, e.g.
   
   ```
   case ex: AnalysisException(_, _, _, _, _, 
"UNRESOLVED_COLUMN.WITH_SUGGESTION", params, _)
   ```
   
   and add the self-reference as a condition, e.g.
   
   ```
   if ex.params.get("objectName").filter(_ == toSQLId(fieldName)).isDefined
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions

Review Comment:
   We ended up banning subquery expressions in column defaults as well. The 
semantics could be complex for generated columns, you may want to think about 
whether you want that.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -1422,6 +1422,97 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
+    "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS 
(year(eventDate)))"
+    for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+      // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+      withTable(s"testcat.$tblName") {
+        if (statement == "REPLACE TABLE") {
+          spark.sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+        }
+        // Can create table with a generated column
+        spark.sql(s"$statement testcat.$tableDefinition USING foo")

Review Comment:
   please check the results of the query here as well?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself
+    val relation = new LocalRelation(StructType(schema.filterNot(_.name == 
fieldName)).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
+          ex.messageParameters.get("objectName").filter(_ == 
toSQLId(fieldName)).foreach { _ =>
+            // Generation expression references itself
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+              messageParameters = Map(
+                "fieldName" -> fieldName,
+                "expressionStr" -> expressionStr,
+                "reason" -> "generation expression cannot reference itself",
+                "errorMessage" -> ex.getMessage))
+          }
+        }
+        if (ex.getErrorClass == "UNRESOLVED_ROUTINE") {
+          // Cannot resolve function using built-in catalog
+          ex.messageParameters.get("routineName").foreach { fnName =>
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+              messageParameters = Map(
+                "fieldName" -> fieldName,
+                "expressionStr" -> expressionStr,
+                "reason" -> s"failed to resolve $fnName to a built-in 
function",
+                "errorMessage" -> ex.getMessage))
+          }
+        }
+        throw new AnalysisException(

Review Comment:
   +1



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself
+    val relation = new LocalRelation(StructType(schema.filterNot(_.name == 
fieldName)).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
+          ex.messageParameters.get("objectName").filter(_ == 
toSQLId(fieldName)).foreach { _ =>
+            // Generation expression references itself
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+              messageParameters = Map(
+                "fieldName" -> fieldName,
+                "expressionStr" -> expressionStr,
+                "reason" -> "generation expression cannot reference itself",
+                "errorMessage" -> ex.getMessage))

Review Comment:
   No, it is not.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself
+    val relation = new LocalRelation(StructType(schema.filterNot(_.name == 
fieldName)).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
+          ex.messageParameters.get("objectName").filter(_ == 
toSQLId(fieldName)).foreach { _ =>

Review Comment:
   is this case-sensitive? Please add a comment, and update the logic if not?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /** The metadata key for saving a generation expression in a generated 
column's metadata */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"

Review Comment:
   for consistency with other metadata keys, we could name this in all capitals 
with underscores separating words, e.g. "GENERATION_EXPRESSION".



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala:
##########
@@ -2717,4 +2717,21 @@ class DDLParserSuite extends AnalysisTest {
       context = ExpectedContext(
         fragment = "b STRING COMMENT \"abc\" NOT NULL COMMENT \"abc\"", start 
= 27, stop = 71))
   }
+
+  test("SPARK-41290: implement parser support for GENERATED ALWAYS AS columns 
in tables") {
+    val schemaWithGeneratedColumn = new StructType()
+      .add("a", IntegerType, true)
+      .add("b", IntegerType, false,
+        new MetadataBuilder().putString("generationExpression", "a+1").build())
+    comparePlans(parsePlan(
+      "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) 
USING parquet"),

Review Comment:
   can you add some negative tests where parsing fails as well?



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -1422,6 +1422,97 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
+    "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS 
(year(eventDate)))"
+    for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+      // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+      withTable(s"testcat.$tblName") {
+        if (statement == "REPLACE TABLE") {
+          spark.sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+        }
+        // Can create table with a generated column
+        spark.sql(s"$statement testcat.$tableDefinition USING foo")
+      }
+      // BasicInMemoryTableCatalog.capabilities() = {}
+      withSQLConf("spark.sql.catalog.dummy" -> 
classOf[BasicInMemoryTableCatalog].getName) {
+        val e = intercept[AnalysisException] {
+          sql("USE dummy")
+          spark.sql(s"$statement dummy.$tableDefinition USING foo")
+        }
+        assert(e.getMessage.contains(
+          "does not support creating generated columns with GENERATED ALWAYS 
AS expressions"))
+        assert(e.getErrorClass == "UNSUPPORTED_FEATURE.TABLE_OPERATION")
+      }
+    }
+  }
+
+  test("SPARK-41290: Column cannot have both a generation expression and a 
default value") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS 
(year(eventDate)) DEFAULT 0)"
+    withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") {
+      for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+        withTable(s"testcat.$tblName") {
+          if (statement == "REPLACE TABLE") {
+            spark.sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+          }
+          checkError(
+            exception = intercept[AnalysisException] {
+              spark.sql(s"$statement testcat.$tableDefinition USING foo")
+            },
+            errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+            parameters = Map(
+              "colName" -> "eventYear",
+              "defaultValue" -> "0",
+              "genExpr" -> "year(eventDate)")
+          )
+        }
+      }
+    }
+  }
+
+  test("SPARK-41290: Generated column expression must be valid generation 
expression") {
+    // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+    val tblName = "my_tab"
+    withTable(s"testcat.$tblName") {
+      // Expression cannot be resolved since it doesn't exist
+      var e = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE testcat.$tblName(a INT, " +
+          s"b DATE GENERATED ALWAYS AS (not_a_function(a))) USING foo")
+      }
+      assert(e.getErrorClass == "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN")
+      assert(e.getMessage.contains("failed to resolve `not_a_function` to a 
built-in function"))

Review Comment:
   please use `checkError` instead of asserting that the message contains a 
substring.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself

Review Comment:
   Is this case-sensitive? Please mention in the comment, and update the next 
line accordingly if not?



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -1422,6 +1422,97 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
+    "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS 
(year(eventDate)))"
+    for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+      // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+      withTable(s"testcat.$tblName") {
+        if (statement == "REPLACE TABLE") {
+          spark.sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+        }
+        // Can create table with a generated column
+        spark.sql(s"$statement testcat.$tableDefinition USING foo")
+      }
+      // BasicInMemoryTableCatalog.capabilities() = {}
+      withSQLConf("spark.sql.catalog.dummy" -> 
classOf[BasicInMemoryTableCatalog].getName) {
+        val e = intercept[AnalysisException] {
+          sql("USE dummy")
+          spark.sql(s"$statement dummy.$tableDefinition USING foo")
+        }
+        assert(e.getMessage.contains(
+          "does not support creating generated columns with GENERATED ALWAYS 
AS expressions"))
+        assert(e.getErrorClass == "UNSUPPORTED_FEATURE.TABLE_OPERATION")
+      }
+    }
+  }
+
+  test("SPARK-41290: Column cannot have both a generation expression and a 
default value") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS 
(year(eventDate)) DEFAULT 0)"
+    withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") {
+      for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+        withTable(s"testcat.$tblName") {
+          if (statement == "REPLACE TABLE") {
+            spark.sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+          }
+          checkError(
+            exception = intercept[AnalysisException] {
+              spark.sql(s"$statement testcat.$tableDefinition USING foo")
+            },
+            errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+            parameters = Map(
+              "colName" -> "eventYear",
+              "defaultValue" -> "0",
+              "genExpr" -> "year(eventDate)")
+          )
+        }
+      }
+    }
+  }
+
+  test("SPARK-41290: Generated column expression must be valid generation 
expression") {
+    // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+    val tblName = "my_tab"
+    withTable(s"testcat.$tblName") {
+      // Expression cannot be resolved since it doesn't exist
+      var e = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE testcat.$tblName(a INT, " +
+          s"b DATE GENERATED ALWAYS AS (not_a_function(a))) USING foo")
+      }
+      assert(e.getErrorClass == "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN")
+      assert(e.getMessage.contains("failed to resolve `not_a_function` to a 
built-in function"))
+
+      // Expression cannot be resolved since it's not a built-in function
+      spark.udf.register("timesTwo", (x: Int) => x * 2)
+      e = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE testcat.$tblName(a INT, " +
+          s"b INT GENERATED ALWAYS AS (timesTwo(a))) USING foo")
+      }
+      assert(e.getErrorClass == "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN")
+      assert(e.getMessage.contains("failed to resolve `timesTwo` to a built-in 
function"))
+
+      // Generated column can't reference itself
+      e = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE testcat.$tblName(a INT, " +
+          s"b INT GENERATED ALWAYS AS (b + 1)) USING foo")
+      }
+      assert(e.getErrorClass == "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN")
+      assert(e.getMessage.contains("generation expression cannot reference 
itself"))
+
+      // Generated column can't reference non-existent column

Review Comment:
   please also add cases where:
   * the expression type is coercible, but not equal to the column type
   * the expression type is not coercible or equal to the column type
   * the expression contains a subquery expression that refers to one of the 
previous columns
   * the expression contains a subquery expression with two levels of nested 
subquery expressions
   * the expression contains a subquery expression that scans from another 
table in the catalog created with a prior CREATE TABLE command
   * the expression refers to one of the columns *after* the `GENERATED ALWAYS 
AS` column



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated 
column's metadata. This is
+   * only used internally and connectors should access generation expressions 
from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "generationExpression"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it 
exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot refer to itself
+   * - No user-defined expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+    expressionStr: String,
+    fieldName: String,
+    schema: StructType,
+    statementType: String): Unit = {
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
+        // during parsing
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the column 
$fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Analyze the parse result
+    // Generated column can't reference itself
+    val relation = new LocalRelation(StructType(schema.filterNot(_.name == 
fieldName)).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
+          ex.messageParameters.get("objectName").filter(_ == 
toSQLId(fieldName)).foreach { _ =>
+            // Generation expression references itself
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+              messageParameters = Map(
+                "fieldName" -> fieldName,
+                "expressionStr" -> expressionStr,
+                "reason" -> "generation expression cannot reference itself",
+                "errorMessage" -> ex.getMessage))
+          }
+        }
+        if (ex.getErrorClass == "UNRESOLVED_ROUTINE") {
+          // Cannot resolve function using built-in catalog
+          ex.messageParameters.get("routineName").foreach { fnName =>
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+              messageParameters = Map(
+                "fieldName" -> fieldName,
+                "expressionStr" -> expressionStr,
+                "reason" -> s"failed to resolve $fnName to a built-in 
function",
+                "errorMessage" -> ex.getMessage))
+          }
+        }
+        throw new AnalysisException(
+          errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+          messageParameters = Map(
+            "fieldName" -> fieldName,
+            "expressionStr" -> expressionStr,
+            "reason" -> "the expression fails to resolve as a valid 
expression",
+            "errorMessage" -> ex.getMessage))
+    }
+    val analyzed = plan.collectFirst {
+      case Project(Seq(a: Alias), _: LocalRelation) => a.child
+    }.get
+    // todo: additional verifications?

Review Comment:
   Yes, we should.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -178,6 +178,15 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE", false)
+
+      if (GeneratedColumn.hasGeneratedColumns(newSchema)) {

Review Comment:
   +1, then we can dedup with L214-L220 below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to