AnishMahto commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3406660538


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AutoCdcIntoCommand.scala:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+
+/**
+ * Logical plan node for an AUTO CDC INTO command, used by Spark Declarative 
Pipelines.
+ *
+ * This represents a CDC (Change Data Capture) operation that applies an 
ordered change event
+ * stream from [[sourceTable]] into [[targetTable]] using SCD Type 1 (upsert) 
semantics.
+ *
+ * This node serves as a parse-time placeholder for a pipeline CDC definition 
and cannot be
+ * executed directly. It will be interpreted by the pipeline submodule once 
execution support
+ * is added (SPARK-57402).
+ *
+ * @param targetTable    The target table to apply changes into.
+ * @param sourceTable    The source relation providing the change events.
+ * @param keys           Column(s) that uniquely identify a row in the target 
table.
+ * @param deleteCondition An optional expression that marks a source row as a 
DELETE operation.
+ *                        When absent, all source rows are treated as upserts.
+ * @param sequenceByExpr Expression that orders CDC events to correctly 
resolve out-of-order
+ *                       arrivals. Must evaluate to a sortable type. Required.
+ * @param specifiedCols  An explicit list of source columns to include in the 
target table.
+ *                       Mutually exclusive with [[exceptCols]].
+ * @param exceptCols     Source columns to exclude from the target table 
(i.e., all columns
+ *                       except these). Mutually exclusive with 
[[specifiedCols]].
+ */
+case class AutoCdcIntoCommand(
+    targetTable: TableIdentifier,
+    sourceTable: LogicalPlan,
+    keys: Seq[UnresolvedAttribute],
+    deleteCondition: Option[Expression],
+    sequenceByExpr: Expression,
+    specifiedCols: Seq[UnresolvedAttribute],
+    exceptCols: Seq[UnresolvedAttribute]
+) extends LeafCommand {

Review Comment:
   I forget, does `LeafCommand` imply the command has no child logical plans? 
Any chance we should be using `UnaryCommand` given the `sourceTable` child?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5529,6 +5529,12 @@
     },
     "sqlState" : "0A000"
   },
+  "MISSING_CLAUSES_FOR_OPERATION" : {
+    "message" : [
+      "Missing required clause(s) <clauses> for operation <operation>."
+    ],
+    "sqlState" : "42601"

Review Comment:
   To me this sounds like a semantic error not a syntactical error, maybe use 
42613 or something else?



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -417,7 +417,11 @@ statement
     | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? 
tableProvider?
         createTableClauses
         (AS query)?                                                    
#createPipelineDataset
+    | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? 
tableProvider?
+        createTableClauses
+        FLOW autoCdcBody                                               
#createStreamingTableAutoCdc

Review Comment:
   Did we consider collapsing `createStreamingTableAutoCdc` with 
`createPipelineDataset` using `|` for the query clause?
   
   I.e is there a good reason for duplicating the shared 
`createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? 
tableProvider?
           createTableClauses`. 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.catalyst.analysis.{
+  AnalysisTest, UnresolvedAttribute,
+  UnresolvedIdentifier, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{
+  AutoCdcIntoCommand,
+  CreateFlowCommand,
+  CreateStreamingTableAutoCdc
+}
+import org.apache.spark.sql.execution.SparkSqlParser
+
+/**
+ * Parser tests for AUTO CDC syntax.
+ *
+ * Covers two supported forms:
+ *   1. CREATE FLOW <name> [COMMENT ...] AS AUTO CDC INTO <target> ...
+ *   2. CREATE STREAMING TABLE <name> FLOW AUTO CDC ...
+ *
+ * Snapshot CDC, SCD Type 2, IGNORE NULL UPDATES, and APPLY AS TRUNCATE WHEN 
are not
+ * supported and should fail to parse. The standalone AUTO CDC INTO form 
(without CREATE FLOW
+ * or CREATE STREAMING TABLE) is also not supported.
+ */
+class AutoCdcParserSuite extends CommandSuiteBase with AnalysisTest {
+  protected lazy val parser = new SparkSqlParser()
+
+  // 
---------------------------------------------------------------------------
+  // CREATE FLOW ... AS AUTO CDC INTO
+  // 
---------------------------------------------------------------------------
+
+  test("CREATE FLOW AS AUTO CDC INTO - minimal form") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW myflow AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (key1, key2)
+        |SEQUENCE BY timestamp""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateFlowCommand]
+    assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == 
Seq("myflow"))
+    assert(cmd.comment.isEmpty)
+
+    val cdc = cmd.flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.targetTable.table == "target")
+    assert(cdc.sourceTable.isInstanceOf[UnresolvedRelation])
+    assert(cdc.sourceTable.asInstanceOf[UnresolvedRelation].isStreaming)
+    assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+    assert(cdc.deleteCondition.isEmpty)
+    assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+    assert(cdc.specifiedCols.isEmpty)
+    assert(cdc.exceptCols.isEmpty)
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - with COMMENT") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW myflow COMMENT 'my comment' AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateFlowCommand]
+    assert(cmd.comment == Some("my comment"))
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - multipart flow name") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW mycat.myschema.myflow AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateFlowCommand]
+    assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
+      Seq("mycat", "myschema", "myflow"))
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - two-part target table name") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW f AS AUTO CDC INTO myschema.mytable
+        |FROM source
+        |KEYS (k)
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cdc = 
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.targetTable.database == Some("myschema"))
+    assert(cdc.targetTable.table == "mytable")
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - APPLY AS DELETE WHEN") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW f AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (id)
+        |APPLY AS DELETE WHEN op = 'DELETE'
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cdc = 
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.deleteCondition.isDefined)
+    assert(cdc.deleteCondition.get.sql.contains("op"))
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - COLUMNS include list") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW f AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts
+        |COLUMNS id, name, value""".stripMargin)
+
+    val cdc = 
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.specifiedCols.map(_.name) == Seq("id", "name", "value"))
+    assert(cdc.exceptCols.isEmpty)
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - COLUMNS * EXCEPT list") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW f AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts
+        |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+    val cdc = 
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.specifiedCols.isEmpty)
+    assert(cdc.exceptCols.map(_.name) == Seq("op", "ts"))
+  }
+
+  test("CREATE FLOW AS AUTO CDC INTO - all clauses combined") {
+    val plan = parser.parsePlan(
+      """CREATE FLOW f AS AUTO CDC INTO target
+        |FROM source
+        |KEYS (key1, key2)
+        |APPLY AS DELETE WHEN key3 = 3
+        |SEQUENCE BY timestamp
+        |COLUMNS key1, key2, key3, timestamp""".stripMargin)
+
+    val cdc = 
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+    assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+    assert(cdc.deleteCondition.isDefined)
+    assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+    assert(cdc.specifiedCols.map(_.name) == Seq("key1", "key2", "key3", 
"timestamp"))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // CREATE STREAMING TABLE ... FLOW AUTO CDC
+  // 
---------------------------------------------------------------------------
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - minimal form") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (key1, key2)
+        |SEQUENCE BY timestamp""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == 
Seq("target"))
+    assert(!cmd.ifNotExists)
+    assert(cmd.sourceTable.isInstanceOf[UnresolvedRelation])
+    assert(cmd.sourceTable.asInstanceOf[UnresolvedRelation].isStreaming)
+    assert(cmd.keys.map(_.name) == Seq("key1", "key2"))
+    assert(cmd.deleteCondition.isEmpty)
+    assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp"))
+    assert(cmd.specifiedCols.isEmpty)
+    assert(cmd.exceptCols.isEmpty)
+  }
+
+  test("CREATE STREAMING TABLE IF NOT EXISTS FLOW AUTO CDC") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE IF NOT EXISTS target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.ifNotExists)
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart table name") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE myschema.mytable
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == 
Seq("myschema", "mytable"))
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - APPLY AS DELETE WHEN") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (id)
+        |APPLY AS DELETE WHEN op = 'DELETE'
+        |SEQUENCE BY ts""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.deleteCondition.isDefined)
+    assert(cmd.deleteCondition.get.sql.contains("op"))
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS include list") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts
+        |COLUMNS id, name, value""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.specifiedCols.map(_.name) == Seq("id", "name", "value"))
+    assert(cmd.exceptCols.isEmpty)
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS * EXCEPT list") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (id)
+        |SEQUENCE BY ts
+        |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.specifiedCols.isEmpty)
+    assert(cmd.exceptCols.map(_.name) == Seq("op", "ts"))
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") {
+    val plan = parser.parsePlan(
+      """CREATE STREAMING TABLE target
+        |FLOW AUTO CDC
+        |FROM source
+        |KEYS (key1, key2)
+        |APPLY AS DELETE WHEN key3 = 3
+        |SEQUENCE BY timestamp
+        |COLUMNS * EXCEPT (key4)""".stripMargin)
+
+    val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+    assert(cmd.keys.map(_.name) == Seq("key1", "key2"))
+    assert(cmd.deleteCondition.isDefined)
+    assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp"))
+    assert(cmd.exceptCols.map(_.name) == Seq("key4"))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Error cases: missing required clause
+  // 
---------------------------------------------------------------------------
+
+  test("CREATE FLOW AS AUTO CDC INTO - SEQUENCE BY is required") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE FLOW f AS AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)""".stripMargin)
+    }
+    assert(e.getCondition == "AUTOCDC_MISSING_SEQUENCE_BY")
+  }
+
+  test("CREATE STREAMING TABLE FLOW AUTO CDC - SEQUENCE BY is required") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE STREAMING TABLE target
+          |FLOW AUTO CDC
+          |FROM source
+          |KEYS (id)""".stripMargin)
+    }
+    assert(e.getCondition == "AUTOCDC_MISSING_SEQUENCE_BY")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Error cases: duplicate clauses
+  // 
---------------------------------------------------------------------------
+
+  test("duplicate SEQUENCE BY clause") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE FLOW f AS AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)
+          |SEQUENCE BY ts1
+          |SEQUENCE BY ts2""".stripMargin)
+    }
+    assert(e.getCondition == "DUPLICATE_CLAUSES")
+  }
+
+  test("duplicate APPLY AS DELETE clause") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE FLOW f AS AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)
+          |APPLY AS DELETE WHEN a = 1
+          |APPLY AS DELETE WHEN b = 2
+          |SEQUENCE BY ts""".stripMargin)
+    }
+    assert(e.getCondition == "DUPLICATE_CLAUSES")
+  }
+
+  test("duplicate COLUMNS clause") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE FLOW f AS AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)
+          |SEQUENCE BY ts
+          |COLUMNS a, b
+          |COLUMNS c, d""".stripMargin)
+    }
+    assert(e.getCondition == "DUPLICATE_CLAUSES")
+  }
+
+  test("both COLUMNS include list and COLUMNS * EXCEPT is an error") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE FLOW f AS AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)
+          |SEQUENCE BY ts
+          |COLUMNS a, b
+          |COLUMNS * EXCEPT (c)""".stripMargin)
+    }
+    assert(e.getCondition == "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Error cases: standalone form not supported
+  // 
---------------------------------------------------------------------------
+
+  test("standalone AUTO CDC INTO is not supported") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """AUTO CDC INTO target
+          |FROM source
+          |KEYS (id)
+          |SEQUENCE BY ts""".stripMargin)
+    }
+    assert(e.getCondition == "PARSE_SYNTAX_ERROR")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Error cases: unsupported dataset types and table features
+  // 
---------------------------------------------------------------------------
+
+  test("AUTO CDC is not supported for MATERIALIZED VIEW") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE MATERIALIZED VIEW target
+          |FLOW AUTO CDC
+          |FROM source
+          |KEYS (id)
+          |SEQUENCE BY ts""".stripMargin)
+    }
+    assert(e.getMessage.contains("AUTO CDC is only supported for STREAMING 
TABLE"))

Review Comment:
   nit: here and other tests, use `checkError` and assert on expected error 
conditions/message parameters.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5529,6 +5529,12 @@
     },
     "sqlState" : "0A000"
   },
+  "MISSING_CLAUSES_FOR_OPERATION" : {

Review Comment:
   Is this actually used anywhere?



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -750,6 +754,38 @@ dmlStatementNoWith
         notMatchedBySourceClause*                                              
    #mergeIntoTable
     ;
 
+autoCdcCommand
+    : AUTO CDC INTO target=multipartIdentifier
+        autoCdcParameters
+    ;
+
+autoCdcBody
+    : AUTO CDC autoCdcParameters
+    ;
+
+autoCdcParameters
+    : FROM source=relationPrimary
+        KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN
+        (autoCdcDeleteClause
+        | autoCdcSequenceByClause
+        | autoCdcColumnsClause
+        )*

Review Comment:
   Is `*` the right expression to use here? Implies there can be multiple 
sequence clauses for example, and pushes validation down to AST logic rather 
than being invalidated at a syntactical level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to