szehon-ho commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3509168244
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1646,6 +1659,10 @@ class SparkSqlAstBuilder extends AstBuilder {
)
if (createPipelineDatasetHeaderCtx.materializedView() != null) {
+ if (ctx.autoCdcBody() != null) {
+ throw operationNotAllowed(
Review Comment:
Should this new user-facing message go through a proper named error
condition instead of `operationNotAllowed` (which maps to
`_LEGACY_ERROR_TEMP_0035`)? We're generally trying to move away from
`_LEGACY_ERROR_TEMP_*` for new errors. That said, it's consistent with the
neighboring pipeline-dataset guards below, so I'm fine leaving it if you'd
rather keep them uniform for now -- just raising the question.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -7710,3 +7762,14 @@ class AstBuilder extends DataTypeAstBuilder
}
}
}
+
+/**
+ * Parameters parsed from an AUTO CDC clause.
+ */
+case class AutoCdcParams(
+ source: LogicalPlan,
Review Comment:
`resolveAutoCdcSource` always returns an `UnresolvedRelation` (and the tests
assert that), so could `AutoCdcParams.source` be typed as `UnresolvedRelation`
to document that invariant? (The `source` on `AutoCdcIntoCommand` /
`CreateStreamingTableAutoCdc` needs to stay `LogicalPlan` since it's a
resolvable plan child, but this transient parser carrier isn't a child.) Or is
the wider `LogicalPlan` type intentional to leave room for other source shapes
later?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala:
##########
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.catalyst.analysis.{
+ AnalysisTest, UnresolvedAttribute, UnresolvedIdentifier, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{
+ AutoCdcIntoCommand,
+ CreateFlowCommand,
+ CreateStreamingTableAutoCdc
+}
+import org.apache.spark.sql.execution.SparkSqlParser
+
+/**
+ * Parser tests for AUTO CDC syntax.
+ *
+ * Covers two supported forms:
+ * 1. CREATE FLOW <name> [COMMENT ...] AS AUTO CDC INTO <target> ...
+ * 2. CREATE STREAMING TABLE <name> FLOW AUTO CDC ...
+ *
+ * Snapshot CDC, SCD Type 2, IGNORE NULL UPDATES, and APPLY AS TRUNCATE WHEN
are not
+ * supported and should fail to parse. The standalone AUTO CDC INTO form
(without CREATE FLOW
+ * or CREATE STREAMING TABLE) is also not supported.
+ */
+class AutoCdcParserSuite extends CommandSuiteBase with AnalysisTest {
+ protected lazy val parser = new SparkSqlParser()
+
+ //
---------------------------------------------------------------------------
+ // CREATE FLOW ... AS AUTO CDC INTO
+ //
---------------------------------------------------------------------------
+
+ test("CREATE FLOW AS AUTO CDC INTO - minimal form") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW myflow AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (key1, key2)
+ |SEQUENCE BY timestamp""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("myflow"))
+ assert(cmd.comment.isEmpty)
+
+ val cdc = cmd.flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.targetTable.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("target"))
+ val source = cdc.source.asInstanceOf[UnresolvedRelation]
+ assert(source.multipartIdentifier == Seq("source"))
+ assert(source.isStreaming)
+ assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cdc.deleteCondition.isEmpty)
+ assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cdc.includeColumns.isEmpty)
+ assert(cdc.excludeColumns.isEmpty)
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - multipart source name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW myflow AS AUTO CDC INTO target
+ |FROM STREAM(mycat.myschema.source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ val source = cdc.source.asInstanceOf[UnresolvedRelation]
+ assert(source.multipartIdentifier == Seq("mycat", "myschema", "source"))
+ assert(source.isStreaming)
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - with COMMENT") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW myflow COMMENT 'my comment' AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.comment == Some("my comment"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - multipart flow name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW mycat.myschema.myflow AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateFlowCommand]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
+ Seq("mycat", "myschema", "myflow"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - two-part target table name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO myschema.mytable
+ |FROM STREAM(source)
+ |KEYS (k)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.targetTable.asInstanceOf[UnresolvedIdentifier].nameParts ==
+ Seq("myschema", "mytable"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - three-part target table name") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO mycat.myschema.mytable
+ |FROM STREAM(source)
+ |KEYS (k)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.targetTable.asInstanceOf[UnresolvedIdentifier].nameParts ==
+ Seq("mycat", "myschema", "mytable"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - APPLY AS DELETE WHEN") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (id)
+ |APPLY AS DELETE WHEN op = 'DELETE'
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.deleteCondition.isDefined)
+ assert(cdc.deleteCondition.get.sql.contains("op"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - COLUMNS include list") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS (id, name, value)""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.includeColumns.get.map(_.name) == Seq("id", "name", "value"))
+ assert(cdc.excludeColumns.isEmpty)
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - COLUMNS * EXCEPT list") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.includeColumns.isEmpty)
+ assert(cdc.excludeColumns.get.map(_.name) == Seq("op", "ts"))
+ }
+
+ test("CREATE FLOW AS AUTO CDC INTO - all clauses combined") {
+ val plan = parser.parsePlan(
+ """CREATE FLOW f AS AUTO CDC INTO target
+ |FROM STREAM(source)
+ |KEYS (key1, key2)
+ |APPLY AS DELETE WHEN key3 = 3
+ |SEQUENCE BY timestamp
+ |COLUMNS (key1, key2, key3, timestamp)""".stripMargin)
+
+ val cdc =
plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand]
+ assert(cdc.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cdc.deleteCondition.isDefined)
+ assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cdc.includeColumns.get.map(_.name) == Seq("key1", "key2", "key3",
"timestamp"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // CREATE STREAMING TABLE ... FLOW AUTO CDC
+ //
---------------------------------------------------------------------------
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - minimal form") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (key1, key2)
+ |SEQUENCE BY timestamp""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("target"))
+ assert(!cmd.ifNotExists)
+ val source = cmd.source.asInstanceOf[UnresolvedRelation]
+ assert(source.multipartIdentifier == Seq("source"))
+ assert(source.isStreaming)
+ assert(cmd.keys.map(_.name) == Seq("key1", "key2"))
+ assert(cmd.deleteCondition.isEmpty)
+ assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp"))
+ assert(cmd.includeColumns.isEmpty)
+ assert(cmd.excludeColumns.isEmpty)
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart source name") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM STREAM(mycat.myschema.source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ val source = cmd.source.asInstanceOf[UnresolvedRelation]
+ assert(source.multipartIdentifier == Seq("mycat", "myschema", "source"))
+ assert(source.isStreaming)
+ }
+
+ test("CREATE STREAMING TABLE IF NOT EXISTS FLOW AUTO CDC") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE IF NOT EXISTS target
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.ifNotExists)
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart table name") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE myschema.mytable
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts ==
Seq("myschema", "mytable"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - APPLY AS DELETE WHEN") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (id)
+ |APPLY AS DELETE WHEN op = 'DELETE'
+ |SEQUENCE BY ts""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.deleteCondition.isDefined)
+ assert(cmd.deleteCondition.get.sql.contains("op"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS include list") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS (id, name, value)""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.includeColumns.get.map(_.name) == Seq("id", "name", "value"))
+ assert(cmd.excludeColumns.isEmpty)
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS * EXCEPT list") {
+ val plan = parser.parsePlan(
+ """CREATE STREAMING TABLE target
+ |FLOW AUTO CDC
+ |FROM STREAM(source)
+ |KEYS (id)
+ |SEQUENCE BY ts
+ |COLUMNS * EXCEPT (op, ts)""".stripMargin)
+
+ val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc]
+ assert(cmd.includeColumns.isEmpty)
+ assert(cmd.excludeColumns.get.map(_.name) == Seq("op", "ts"))
+ }
+
+ test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") {
Review Comment:
Nice coverage of the AUTO CDC clauses here. `CreateStreamingTableAutoCdc`
also carries `partitioning`, `tableSpec`, and `columns`, but I don't see a
positive test exercising `PARTITIONED BY` / `COMMENT` / `TBLPROPERTIES` flowing
through on the AUTO CDC streaming-table form. Could we add one so those
pass-through fields don't regress silently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]