szehon-ho commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3398976541
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -824,6 +824,47 @@ case class CreateStreamingTable(
copy(name = newChild)
}
+/**
+ * Command parsed from `CREATE STREAMING TABLE <name> FLOW AUTO CDC ...` SQL
syntax.
+ * This command serves as a parse-time placeholder for a pipeline CDC
definition and cannot be
+ * executed directly. It is interpreted by the pipeline submodule during a
pipeline execution.
Review Comment:
This says the node "is interpreted by the pipeline submodule during a
pipeline execution," but there's currently no handler:
`SqlGraphRegistrationContext.processSqlQuery` has no case for
`CreateStreamingTableAutoCdc` (it hits the `unsupportedLogicalPlan` branch),
and `CreateFlowHandler` only accepts `InsertIntoStatement`. So this SQL parses
but always fails at registration today. Please soften to something like "will
be interpreted once execution support is added" (and link the follow-up), or
add the handling. Same wording applies to `AutoCdcIntoCommand`.
##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4:
##########
@@ -139,8 +139,10 @@ AND: 'AND';
ANTI: 'ANTI';
ANY: 'ANY';
ANY_VALUE: 'ANY_VALUE';
+APPLY: 'APPLY';
APPROX: 'APPROX';
ARCHIVE: 'ARCHIVE';
+AUTO: 'AUTO';
Review Comment:
`AUTO` is out of alphabetical order here (between `ARCHIVE` and `ARRAY`); it
should sit after `AUTHORIZATION`. Same issue for `SEQUENCE` (between `SCHEMA`
and `SCHEMAS`), and in `nonReserved` / the `sql-ref-ansi-compliance.md` table
(where `AUTO` is listed before `ATOMIC`). The generated `.sql.out` golden files
are already correctly sorted — just the hand-maintained lists are off. Minor,
but worth fixing since you already have an ordering-fix commit.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,50 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ checkDuplicateClauses(params.autoCdcDeleteClause(), "APPLY AS DELETE",
params)
+ checkDuplicateClauses(params.autoCdcSequenceByClause(), "SEQUENCE BY",
params)
+ checkDuplicateClauses(params.autoCdcColumnsClause(), "COLUMNS", params)
+
+ if (params.autoCdcSequenceByClause().isEmpty) {
+ throw QueryParsingErrors.missingClausesForOperation(
Review Comment:
The Connect path reports specific conditions for the same mistakes —
`AUTOCDC_MISSING_SEQUENCE_BY`, `AUTOCDC_MISSING_SOURCE`,
`AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST`,
`AUTOCDC_NON_COLUMN_IDENTIFIER` (see `PipelinesHandler.buildAutoCdcFlow`). Here
a missing `SEQUENCE BY` raises `MISSING_CLAUSES_FOR_OPERATION` instead, so the
same error surfaces a different condition depending on whether the user came
via SQL or Python. Could we reuse the `AUTOCDC_*` conditions (at minimum for
missing sequence-by) for consistency across front-ends?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,50 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ checkDuplicateClauses(params.autoCdcDeleteClause(), "APPLY AS DELETE",
params)
+ checkDuplicateClauses(params.autoCdcSequenceByClause(), "SEQUENCE BY",
params)
+ checkDuplicateClauses(params.autoCdcColumnsClause(), "COLUMNS", params)
+
+ if (params.autoCdcSequenceByClause().isEmpty) {
+ throw QueryParsingErrors.missingClausesForOperation(
+ params, "SEQUENCE BY", "AUTO CDC INTO")
+ }
+
+ val sourceTable = plan(params.source.relationPrimary)
Review Comment:
`plan(params.source.relationPrimary)` only consumes the `relationPrimary` of
`source=relation`. Since `relation : LATERAL? relationPrimary
relationExtension*`, any join/pivot/unpivot/`LATERAL` on the source is parsed
and then silently discarded. For example, both of these parse fine but use only
`source`:
```sql
-- JOIN dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source JOIN dim ON source.id = dim.id
KEYS (id) SEQUENCE BY ts
-- PIVOT dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source PIVOT (sum(amt) FOR region IN ('US','EU'))
KEYS (id) SEQUENCE BY ts
```
Either restrict the grammar to `source=relationPrimary` so the extra syntax
is a parse error, or build from the full `relation`. Also note the Connect path
models the source as a streaming relation (`UnresolvedRelation(isStreaming =
true)`); here it's a plain relation with no streaming marker, so resolution
semantics will diverge from the Python/Connect path.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.catalyst.analysis.{
+ AnalysisTest, UnresolvedAttribute,
+ UnresolvedIdentifier, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{
+ AutoCdcIntoCommand,
+ CreateFlowCommand,
+ CreateStreamingTableAutoCdc
+}
+import org.apache.spark.sql.execution.SparkSqlParser
+
+/**
+ * Parser tests for AUTO CDC syntax.
+ *
+ * Covers two supported forms:
+ * 1. CREATE FLOW <name> [COMMENT ...] AS AUTO CDC INTO <target> ...
+ * 2. CREATE STREAMING TABLE <name> FLOW AUTO CDC ...
+ *
+ * Snapshot CDC, SCD Type 2, IGNORE NULL UPDATES, and APPLY AS TRUNCATE WHEN
are not
+ * supported and should fail to parse. The standalone AUTO CDC INTO form
(without CREATE FLOW
+ * or CREATE STREAMING TABLE) is also not supported.
+ */
+class AutoCdcParserSuite extends CommandSuiteBase with AnalysisTest {
+ protected lazy val parser = new SparkSqlParser()
+
+ //
---------------------------------------------------------------------------
+ // CREATE FLOW ... AS AUTO CDC INTO
+ //
---------------------------------------------------------------------------
+
+ test("CREATE FLOW AS AUTO CDC INTO - minimal form") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW myflow AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (key1, key2)
+ |SEQUENCE BY timestamp""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("myflow"))
+ assert(cmd.comment.isEmpty)
+
+ val cdc = cmd.flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.targetTable.table == "target")
+ assert(cdc.sourceTable.isInstanceOf[UnresolvedRelation])
+ assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cdc.deleteCondition.isEmpty)
+ assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cdc.specifiedCols.isEmpty)
+ assert(cdc.exceptCols.isEmpty)
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - with COMMENT") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW myflow COMMENT 'my comment' AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.comment == Some("my comment"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - multipart flow name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW mycat.myschema.myflow AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
+ Seq("mycat", "myschema", "myflow"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - two-part target table name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO myschema.mytable
+ |FROM source
+ |KEYS (k)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.targetTable.database == Some("myschema"))
+ assert(cdc.targetTable.table == "mytable")
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - APPLY AS DELETE WHEN") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (id)
+ |APPLY AS DELETE WHEN op = 'DELETE'
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.deleteCondition.isDefined)
+ assert(cdc.deleteCondition.get.sql.contains("op"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - COLUMNS include list") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS id, name, value""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.specifiedCols.map(_.name) == Seq("id", "name", "value"))
+ assert(cdc.exceptCols.isEmpty)
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - COLUMNS * EXCEPT list") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.specifiedCols.isEmpty)
+ assert(cdc.exceptCols.map(_.name) == Seq("op", "ts"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - all clauses combined") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM source
+ |KEYS (key1, key2)
+ |APPLY AS DELETE WHEN key3 = 3
+ |SEQUENCE BY timestamp
+ |COLUMNS key1, key2, key3, timestamp""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cdc.deleteCondition.isDefined)
+ assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cdc.specifiedCols.map(_.name) == Seq("key1", "key2", "key3",
"timestamp"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // CREATE STREAMING TABLE ... FLOW AUTO CDC
+ //
---------------------------------------------------------------------------
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - minimal form") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (key1, key2)
+ |SEQUENCE BY timestamp""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("target"))
+ assert(!cmd.ifNotExists)
+ assert(cmd.sourceTable.isInstanceOf[UnresolvedRelation])
+ assert(cmd.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cmd.deleteCondition.isEmpty)
+ assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cmd.specifiedCols.isEmpty)
+ assert(cmd.exceptCols.isEmpty)
+ }
+
+ test("CREATE STREAMING TABLE IF NOT EXISTS FLOW AUTO CDC") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE IF NOT EXISTS target
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.ifNotExists)
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart table name") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE myschema.mytable
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("myschema", "mytable"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - APPLY AS DELETE WHEN") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (id)
+ |APPLY AS DELETE WHEN op = 'DELETE'
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.deleteCondition.isDefined)
+ assert(cmd.deleteCondition.get.sql.contains("op"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS include list") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS id, name, value""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.specifiedCols.map(_.name) == Seq("id", "name", "value"))
+ assert(cmd.exceptCols.isEmpty)
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS * EXCEPT list") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM source
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.specifiedCols.isEmpty)
+ assert(cmd.exceptCols.map(_.name) == Seq("op", "ts"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") {
Review Comment:
`visitCreateStreamingTableAutoCdc` rejects MATERIALIZED VIEW, bucketing,
options, serde, location, and column constraints via `operationNotAllowed`, but
none of those branches are covered here. Please add at least the MV-rejection
case (and ideally column-constraints), since they're easy to regress silently.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1516,6 +1516,98 @@ class SparkSqlAstBuilder extends AstBuilder {
)
}
+ override def visitCreateFlowAutoCdc(
+ ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+ val flowHeaderCtx = ctx.createPipelineFlowHeader()
+ val ident = withIdentClause(flowHeaderCtx.flowName,
UnresolvedIdentifier(_))
+ val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec)
+ val applyChanges = buildAutoCdcIntoCommand(ctx.autoCdcCommand())
+ CreateFlowCommand(
+ name = ident,
+ flowOperation = applyChanges,
+ comment = commentOpt
+ )
+ }
+
+ override def visitCreateStreamingTableAutoCdc(
+ ctx: CreateStreamingTableAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+ val headerCtx = ctx.createPipelineDatasetHeader()
+
+ if (headerCtx.materializedView() != null) {
+ throw operationNotAllowed(
+ "AUTO CDC is only supported for STREAMING TABLE, not MATERIALIZED
VIEW.", ctx)
+ }
+
+ val ifNotExists = headerCtx.EXISTS() != null
+ val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
+ val (colDefs, colConstraints) =
Option(ctx.tableElementList()).map(visitTableElementList)
Review Comment:
This whole prelude — the `colConstraints` check, the
`visitCreateTableClauses` destructure, the `partitioning` computation, the
`bucketSpec`/`options`/`serde`/`location` guards, and the `TableSpec(...)`
block — is duplicated almost verbatim from `visitCreatePipelineDataset` below.
Suggest extracting a shared private helper (e.g. returning `(colDefs,
partitioning, spec, ifNotExists, tableIdent)`) and calling it from both, so
they can't drift.
Note the copy also quietly shortened the error messages (e.g. dropping the
"Please remove any CHECK, UNIQUE, PK, and FK constraints..." guidance, and
collapsing the `STORED AS` vs Hive-SerDe distinction). Factoring out the helper
would keep these consistent with the existing `CREATE STREAMING TABLE` messages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]