c27kwan commented on code in PR #47614:
URL: https://github.com/apache/spark/pull/47614#discussion_r1707138967
##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -542,6 +542,23 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
+ def identityColumnUnsupportedDataType(
+ ctx: IdentityColumnContext,
+ dataType: String): Throwable = {
Review Comment:
style: we don't do vertical alignment with the `(` for the arguments
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -3481,11 +3488,60 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Create a generation expression string.
*/
- override def visitGenerationExpression(ctx: GenerationExpressionContext):
String =
+ override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
withOrigin(ctx) {
getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
}
+ /**
+ * Parse and verify IDENTITY column definition.
+ *
+ * @param ctx The parser context.
+ * @param dataType The data type of column defined as IDENTITY column. Used
for verification.
+ * @return Tuple containing start, step and allowExplicitInsert.
+ */
+ protected def parseIdentityColumn(
Review Comment:
`visit` instead of `parse` for consistency?
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala:
##########
@@ -2854,10 +2856,139 @@ class DDLParserSuite extends AnalysisTest {
exception = parseException(
"CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING
PARQUET"),
errorClass = "PARSE_SYNTAX_ERROR",
- parameters = Map("error" -> "'a'", "hint" -> ": missing '('")
+ parameters = Map("error" -> "'a'", "hint" -> "")
)
}
+ test("SPARK-48824: implement parser support for GENERATED ALWAYS AS IDENTITY
columns in tables") {
Review Comment:
and BY DEFAULT
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala:
##########
@@ -2854,10 +2856,139 @@ class DDLParserSuite extends AnalysisTest {
exception = parseException(
"CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING
PARQUET"),
errorClass = "PARSE_SYNTAX_ERROR",
- parameters = Map("error" -> "'a'", "hint" -> ": missing '('")
+ parameters = Map("error" -> "'a'", "hint" -> "")
)
}
+ test("SPARK-48824: implement parser support for GENERATED ALWAYS AS IDENTITY
columns in tables") {
+ for {
+ identityColumnDefStr <- Seq("BY DEFAULT", "ALWAYS")
+ identityColumnSpecStr <- Seq(
+ "START WITH 1 INCREMENT BY 1",
+ "INCREMENT BY 1 START WITH 1",
+ "START WITH 1",
+ "INCREMENT BY 1",
+ "")
+ } {
+ val columnsWithIdentitySpec = Seq(
+ ColumnDefinition(
+ "id",
+ LongType,
+ nullable = true,
+ identityColumnSpec = Some(
+ IdentityColumnSpec(
+ start = 1,
+ step = 1,
+ allowExplicitInsert = identityColumnDefStr == "BY DEFAULT"
+ )
+ )
+ ),
+ ColumnDefinition("val", IntegerType)
+ )
+ comparePlans(
+ parsePlan(
+ s"CREATE TABLE my_tab(id BIGINT GENERATED $identityColumnDefStr" +
+ s" AS IDENTITY($identityColumnSpecStr), val INT) USING parquet"
Review Comment:
Do we have a test where we don't have the brackets after `AS IDENTITY` at
all?
##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -542,6 +542,23 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
+ def identityColumnUnsupportedDataType(
+ ctx: IdentityColumnContext,
+ dataType: String): Throwable = {
+ new ParseException(
+ "IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE", Map("dataType" -> dataType),
ctx)
+ }
+
+ def identityColumnIllegalStep(ctx: IdentityColSpecContext): Throwable = {
+ new ParseException("IDENTITY_COLUMNS_ILLEGAL_STEP", Map.empty, ctx)
+ }
+
+ def identityColumnDuplicatedDescriptor(
+ ctx: IdentityColSpecContext, option: String):
Throwable = {
Review Comment:
indent is off.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -3481,11 +3488,60 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Create a generation expression string.
*/
- override def visitGenerationExpression(ctx: GenerationExpressionContext):
String =
+ override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
withOrigin(ctx) {
getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
}
+ /**
+ * Parse and verify IDENTITY column definition.
+ *
+ * @param ctx The parser context.
+ * @param dataType The data type of column defined as IDENTITY column. Used
for verification.
+ * @return Tuple containing start, step and allowExplicitInsert.
+ */
+ protected def parseIdentityColumn(
+ ctx: IdentityColumnContext,
+ dataType: DataType): IdentityColumnSpec =
{
+ if (dataType != LongType) {
+ throw QueryParsingErrors.identityColumnUnsupportedDataType(ctx,
dataType.toString)
+ }
Review Comment:
Maybe this should be a decision for the underlying framework. Theoretically,
a framework extending Spark might want to support INT type identity column. So
I think we can do two things:
1. Delegate this decision to the underlying framework by offering an
interface they can override (do we have example of such code in the codebase?)
2. Allowlist every valid type for identity column, knowing that the `step`
and `start` are defined as `integers`. Let the underlying framework that
extends Spark throw its own error if it doesn't want to support more than
LongType.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala:
##########
@@ -41,7 +42,9 @@ case class ColumnDefinition(
comment: Option[String] = None,
defaultValue: Option[DefaultValueExpression] = None,
generationExpression: Option[String] = None,
+ identityColumnSpec: Option[IdentityColumnSpec] = None,
metadata: Metadata = Metadata.empty) extends Expression with Unevaluable {
+ assert(generationExpression.isEmpty || identityColumnSpec.isEmpty)
Review Comment:
This assertion needs a short sentence to contextualize. "A ColumnDefinition
cannot contain both a generation expression and an identity column spec".
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -1534,6 +1534,24 @@
],
"sqlState" : "42601"
},
+ "IDENTITY_COLUMNS_DUPLICATED_DESCRIPTOR" : {
+ "message" : [
+ "Duplicated IDENTITY column descriptor: <option>."
Review Comment:
`descriptor` instead of `option`? Should we display the duplicates instead
of just the keyword?
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java:
##########
@@ -59,5 +59,21 @@ public enum TableCatalogCapability {
* {@link TableCatalog#createTable}.
* See {@link Column#defaultValue()}.
*/
- SUPPORT_COLUMN_DEFAULT_VALUE
+ SUPPORT_COLUMN_DEFAULT_VALUE,
+
+ /**
+ * Signals that the TableCatalog supports defining identity columns upon
table creation in SQL.
+ * <p>
+ * Without this capability, any create/replace table statements with an
identity column defined
+ * in the table schema will throw an exception during analysis.
+ * <p>
+ * An identity column is defined with syntax:
+ * {@code colName colType GENERATED (ALWAYS | BY DEFAULT) AS
IDENTITY(identityColumnSpec)}
Review Comment:
I think this is a bit awkward because `(` is being used as both the literal
LPAREN and as a bracket (. Can we split the syntax explanation here into two,
one for generated always and one for generated by default?
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala:
##########
@@ -2854,10 +2856,139 @@ class DDLParserSuite extends AnalysisTest {
exception = parseException(
"CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING
PARQUET"),
errorClass = "PARSE_SYNTAX_ERROR",
- parameters = Map("error" -> "'a'", "hint" -> ": missing '('")
+ parameters = Map("error" -> "'a'", "hint" -> "")
)
}
+ test("SPARK-48824: implement parser support for GENERATED ALWAYS AS IDENTITY
columns in tables") {
+ for {
+ identityColumnDefStr <- Seq("BY DEFAULT", "ALWAYS")
+ identityColumnSpecStr <- Seq(
+ "START WITH 1 INCREMENT BY 1",
+ "INCREMENT BY 1 START WITH 1",
+ "START WITH 1",
+ "INCREMENT BY 1",
Review Comment:
I'd like to see tests with non-default parameter values for the step and
start so that we can test that code path more clearly. Negative steps and
starts would also be interesting.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala:
##########
@@ -75,8 +86,20 @@ case class ColumnDefinition(
generationExpression.foreach { generationExpr =>
metadataBuilder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY,
generationExpr)
}
+ encodeIdentityColumnSpec(metadataBuilder)
StructField(name, dataType, nullable, metadataBuilder.build())
}
+
+ private def encodeIdentityColumnSpec(metadataBuilder: MetadataBuilder): Unit
= {
+ identityColumnSpec.foreach{ spec: IdentityColumnSpec =>
Review Comment:
```suggestion
identityColumnSpec.foreach { spec: IdentityColumnSpec =>
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -3481,11 +3488,60 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Create a generation expression string.
*/
- override def visitGenerationExpression(ctx: GenerationExpressionContext):
String =
+ override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
withOrigin(ctx) {
getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
}
+ /**
+ * Parse and verify IDENTITY column definition.
+ *
+ * @param ctx The parser context.
+ * @param dataType The data type of column defined as IDENTITY column. Used
for verification.
+ * @return Tuple containing start, step and allowExplicitInsert.
+ */
+ protected def parseIdentityColumn(
+ ctx: IdentityColumnContext,
+ dataType: DataType): IdentityColumnSpec =
{
Review Comment:
indent/alignment
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -3481,11 +3488,60 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Create a generation expression string.
*/
- override def visitGenerationExpression(ctx: GenerationExpressionContext):
String =
+ override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
withOrigin(ctx) {
getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
}
+ /**
+ * Parse and verify IDENTITY column definition.
+ *
+ * @param ctx The parser context.
+ * @param dataType The data type of column defined as IDENTITY column. Used
for verification.
+ * @return Tuple containing start, step and allowExplicitInsert.
+ */
+ protected def parseIdentityColumn(
+ ctx: IdentityColumnContext,
+ dataType: DataType): IdentityColumnSpec =
{
+ if (dataType != LongType) {
+ throw QueryParsingErrors.identityColumnUnsupportedDataType(ctx,
dataType.toString)
+ }
+ // We support two flavors of syntax:
+ // (1) GENERATED ALWAYS AS IDENTITY (...)
+ // (2) GENERATED BY DEFAULT AS IDENTITY (...)
+ // (1) forbids explicit inserts, while (2) allows.
+ val allowExplicitInsert = ctx.BY() != null && ctx.DEFAULT() != null
+ val (start, step) = visitIdentityColSpec(ctx.identityColSpec())
+
+ IdentityColumnSpec(start, step, allowExplicitInsert)
+ }
+
+ override def visitIdentityColSpec(ctx: IdentityColSpecContext): (Long, Long)
= withOrigin(ctx) {
+ val defaultStart = 1
+ val defaultStep = 1
+ if (ctx == null) {
+ return (defaultStart, defaultStep)
+ }
+ var (start, step): (Option[Long], Option[Long]) = (None, None)
+ ctx.identityColSpecOption().asScala.foreach { option =>
+ if (option.start != null) {
+ if (start.isDefined) {
+ throw QueryParsingErrors.identityColumnDuplicatedDescriptor(ctx,
"START")
+ }
+ start = Some(option.start.getText.toLong)
+ } else if (option.step != null) {
+ if (step.isDefined) {
+ throw QueryParsingErrors.identityColumnDuplicatedDescriptor(ctx,
"STEP")
+ }
+ step = Some(option.step.getText.toLong)
+ if (step.get == 0L) {
+ throw QueryParsingErrors.identityColumnIllegalStep(ctx)
+ }
+ }
Review Comment:
`else` throw an error for having an unexpected option?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog,
TableCatalogCapability}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{StructField, StructType}
+
+case class IdentityColumnSpec(start: Long, step: Long, allowExplicitInsert:
Boolean)
+
+/**
+ * This object contains utility methods and values for Identity Columns
+ */
+object IdentityColumn {
+ val IDENTITY_INFO_START = "identity.start"
+ val IDENTITY_INFO_STEP = "identity.step"
+ val IDENTITY_INFO_ALLOW_EXPLICIT_INSERT = "identity.allowExplicitInsert"
+
+ def validateIdentityColumn(
Review Comment:
short javadoc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]