AnishMahto commented on code in PR #56419: URL: https://github.com/apache/spark/pull/56419#discussion_r3406660538
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AutoCdcIntoCommand.scala: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} + +/** + * Logical plan node for an AUTO CDC INTO command, used by Spark Declarative Pipelines. + * + * This represents a CDC (Change Data Capture) operation that applies an ordered change event + * stream from [[sourceTable]] into [[targetTable]] using SCD Type 1 (upsert) semantics. + * + * This node serves as a parse-time placeholder for a pipeline CDC definition and cannot be + * executed directly. It will be interpreted by the pipeline submodule once execution support + * is added (SPARK-57402). + * + * @param targetTable The target table to apply changes into. + * @param sourceTable The source relation providing the change events. + * @param keys Column(s) that uniquely identify a row in the target table. + * @param deleteCondition An optional expression that marks a source row as a DELETE operation. + * When absent, all source rows are treated as upserts. + * @param sequenceByExpr Expression that orders CDC events to correctly resolve out-of-order + * arrivals. Must evaluate to a sortable type. Required. + * @param specifiedCols An explicit list of source columns to include in the target table. + * Mutually exclusive with [[exceptCols]]. + * @param exceptCols Source columns to exclude from the target table (i.e., all columns + * except these). Mutually exclusive with [[specifiedCols]]. + */ +case class AutoCdcIntoCommand( + targetTable: TableIdentifier, + sourceTable: LogicalPlan, + keys: Seq[UnresolvedAttribute], + deleteCondition: Option[Expression], + sequenceByExpr: Expression, + specifiedCols: Seq[UnresolvedAttribute], + exceptCols: Seq[UnresolvedAttribute] +) extends LeafCommand { Review Comment: I forget, does `LeafCommand` imply the command has no child logical plans? Any chance we should be using `UnaryCommand` given the `sourceTable` child? ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -5529,6 +5529,12 @@ }, "sqlState" : "0A000" }, + "MISSING_CLAUSES_FOR_OPERATION" : { + "message" : [ + "Missing required clause(s) <clauses> for operation <operation>." + ], + "sqlState" : "42601" Review Comment: To me this sounds like a semantic error not a syntactical error, maybe use 42613 or something else? ########## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ########## @@ -417,7 +417,11 @@ statement | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? createTableClauses (AS query)? #createPipelineDataset + | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? + createTableClauses + FLOW autoCdcBody #createStreamingTableAutoCdc Review Comment: Did we consider collapsing `createStreamingTableAutoCdc` with `createPipelineDataset` using `|` for the query clause? I.e is there a good reason for duplicating the shared `createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? createTableClauses`. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.sql.catalyst.analysis.{ + AnalysisTest, UnresolvedAttribute, + UnresolvedIdentifier, UnresolvedRelation} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.{ + AutoCdcIntoCommand, + CreateFlowCommand, + CreateStreamingTableAutoCdc +} +import org.apache.spark.sql.execution.SparkSqlParser + +/** + * Parser tests for AUTO CDC syntax. + * + * Covers two supported forms: + * 1. CREATE FLOW <name> [COMMENT ...] AS AUTO CDC INTO <target> ... + * 2. CREATE STREAMING TABLE <name> FLOW AUTO CDC ... + * + * Snapshot CDC, SCD Type 2, IGNORE NULL UPDATES, and APPLY AS TRUNCATE WHEN are not + * supported and should fail to parse. The standalone AUTO CDC INTO form (without CREATE FLOW + * or CREATE STREAMING TABLE) is also not supported. + */ +class AutoCdcParserSuite extends CommandSuiteBase with AnalysisTest { + protected lazy val parser = new SparkSqlParser() + + // --------------------------------------------------------------------------- + // CREATE FLOW ... AS AUTO CDC INTO + // --------------------------------------------------------------------------- + + test("CREATE FLOW AS AUTO CDC INTO - minimal form") { + val plan = parser.parsePlan( + """CREATE FLOW myflow AS AUTO CDC INTO target + |FROM source + |KEYS (key1, key2) + |SEQUENCE BY timestamp""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("myflow")) + assert(cmd.comment.isEmpty) + + val cdc = cmd.flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.targetTable.table == "target") + assert(cdc.sourceTable.isInstanceOf[UnresolvedRelation]) + assert(cdc.sourceTable.asInstanceOf[UnresolvedRelation].isStreaming) + assert(cdc.keys.map(_.name) == Seq("key1", "key2")) + assert(cdc.deleteCondition.isEmpty) + assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cdc.specifiedCols.isEmpty) + assert(cdc.exceptCols.isEmpty) + } + + test("CREATE FLOW AS AUTO CDC INTO - with COMMENT") { + val plan = parser.parsePlan( + """CREATE FLOW myflow COMMENT 'my comment' AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.comment == Some("my comment")) + } + + test("CREATE FLOW AS AUTO CDC INTO - multipart flow name") { + val plan = parser.parsePlan( + """CREATE FLOW mycat.myschema.myflow AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == + Seq("mycat", "myschema", "myflow")) + } + + test("CREATE FLOW AS AUTO CDC INTO - two-part target table name") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO myschema.mytable + |FROM source + |KEYS (k) + |SEQUENCE BY ts""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.targetTable.database == Some("myschema")) + assert(cdc.targetTable.table == "mytable") + } + + test("CREATE FLOW AS AUTO CDC INTO - APPLY AS DELETE WHEN") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |APPLY AS DELETE WHEN op = 'DELETE' + |SEQUENCE BY ts""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.deleteCondition.isDefined) + assert(cdc.deleteCondition.get.sql.contains("op")) + } + + test("CREATE FLOW AS AUTO CDC INTO - COLUMNS include list") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS id, name, value""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.specifiedCols.map(_.name) == Seq("id", "name", "value")) + assert(cdc.exceptCols.isEmpty) + } + + test("CREATE FLOW AS AUTO CDC INTO - COLUMNS * EXCEPT list") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS * EXCEPT (op, ts)""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.specifiedCols.isEmpty) + assert(cdc.exceptCols.map(_.name) == Seq("op", "ts")) + } + + test("CREATE FLOW AS AUTO CDC INTO - all clauses combined") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (key1, key2) + |APPLY AS DELETE WHEN key3 = 3 + |SEQUENCE BY timestamp + |COLUMNS key1, key2, key3, timestamp""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.keys.map(_.name) == Seq("key1", "key2")) + assert(cdc.deleteCondition.isDefined) + assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cdc.specifiedCols.map(_.name) == Seq("key1", "key2", "key3", "timestamp")) + } + + // --------------------------------------------------------------------------- + // CREATE STREAMING TABLE ... FLOW AUTO CDC + // --------------------------------------------------------------------------- + + test("CREATE STREAMING TABLE FLOW AUTO CDC - minimal form") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (key1, key2) + |SEQUENCE BY timestamp""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("target")) + assert(!cmd.ifNotExists) + assert(cmd.sourceTable.isInstanceOf[UnresolvedRelation]) + assert(cmd.sourceTable.asInstanceOf[UnresolvedRelation].isStreaming) + assert(cmd.keys.map(_.name) == Seq("key1", "key2")) + assert(cmd.deleteCondition.isEmpty) + assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cmd.specifiedCols.isEmpty) + assert(cmd.exceptCols.isEmpty) + } + + test("CREATE STREAMING TABLE IF NOT EXISTS FLOW AUTO CDC") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE IF NOT EXISTS target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.ifNotExists) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart table name") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE myschema.mytable + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("myschema", "mytable")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - APPLY AS DELETE WHEN") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |APPLY AS DELETE WHEN op = 'DELETE' + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.deleteCondition.isDefined) + assert(cmd.deleteCondition.get.sql.contains("op")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS include list") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS id, name, value""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.specifiedCols.map(_.name) == Seq("id", "name", "value")) + assert(cmd.exceptCols.isEmpty) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS * EXCEPT list") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS * EXCEPT (op, ts)""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.specifiedCols.isEmpty) + assert(cmd.exceptCols.map(_.name) == Seq("op", "ts")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (key1, key2) + |APPLY AS DELETE WHEN key3 = 3 + |SEQUENCE BY timestamp + |COLUMNS * EXCEPT (key4)""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.keys.map(_.name) == Seq("key1", "key2")) + assert(cmd.deleteCondition.isDefined) + assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cmd.exceptCols.map(_.name) == Seq("key4")) + } + + // --------------------------------------------------------------------------- + // Error cases: missing required clause + // --------------------------------------------------------------------------- + + test("CREATE FLOW AS AUTO CDC INTO - SEQUENCE BY is required") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id)""".stripMargin) + } + assert(e.getCondition == "AUTOCDC_MISSING_SEQUENCE_BY") + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - SEQUENCE BY is required") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (id)""".stripMargin) + } + assert(e.getCondition == "AUTOCDC_MISSING_SEQUENCE_BY") + } + + // --------------------------------------------------------------------------- + // Error cases: duplicate clauses + // --------------------------------------------------------------------------- + + test("duplicate SEQUENCE BY clause") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts1 + |SEQUENCE BY ts2""".stripMargin) + } + assert(e.getCondition == "DUPLICATE_CLAUSES") + } + + test("duplicate APPLY AS DELETE clause") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |APPLY AS DELETE WHEN a = 1 + |APPLY AS DELETE WHEN b = 2 + |SEQUENCE BY ts""".stripMargin) + } + assert(e.getCondition == "DUPLICATE_CLAUSES") + } + + test("duplicate COLUMNS clause") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS a, b + |COLUMNS c, d""".stripMargin) + } + assert(e.getCondition == "DUPLICATE_CLAUSES") + } + + test("both COLUMNS include list and COLUMNS * EXCEPT is an error") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS a, b + |COLUMNS * EXCEPT (c)""".stripMargin) + } + assert(e.getCondition == "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST") + } + + // --------------------------------------------------------------------------- + // Error cases: standalone form not supported + // --------------------------------------------------------------------------- + + test("standalone AUTO CDC INTO is not supported") { + val e = intercept[ParseException] { + parser.parsePlan( + """AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + } + assert(e.getCondition == "PARSE_SYNTAX_ERROR") + } + + // --------------------------------------------------------------------------- + // Error cases: unsupported dataset types and table features + // --------------------------------------------------------------------------- + + test("AUTO CDC is not supported for MATERIALIZED VIEW") { + val e = intercept[ParseException] { + parser.parsePlan( + """CREATE MATERIALIZED VIEW target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + } + assert(e.getMessage.contains("AUTO CDC is only supported for STREAMING TABLE")) Review Comment: nit: here and other tests, use `checkError` and assert on expected error conditions/message parameters. ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -5529,6 +5529,12 @@ }, "sqlState" : "0A000" }, + "MISSING_CLAUSES_FOR_OPERATION" : { Review Comment: Is this actually used anywhere? ########## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ########## @@ -750,6 +754,38 @@ dmlStatementNoWith notMatchedBySourceClause* #mergeIntoTable ; +autoCdcCommand + : AUTO CDC INTO target=multipartIdentifier + autoCdcParameters + ; + +autoCdcBody + : AUTO CDC autoCdcParameters + ; + +autoCdcParameters + : FROM source=relationPrimary + KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN + (autoCdcDeleteClause + | autoCdcSequenceByClause + | autoCdcColumnsClause + )* Review Comment: Is `*` the right expression to use here? Implies there can be multiple sequence clauses for example, and pushes validation down to AST logic rather than being invalidated at a syntactical level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
