AnishMahto commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3410612346


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =
+    withOrigin(params) {
+      val sourceTable = plan(params.source) match {
+        case r: UnresolvedRelation => r.copy(isStreaming = true)

Review Comment:
   Instead of silently forcing the unresolved relation to be streaming, can we 
just throw if the user tries specifying a non-streaming relation? The error can 
guide users to use `STREAM`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =
+    withOrigin(params) {
+      val sourceTable = plan(params.source) match {
+        case r: UnresolvedRelation => r.copy(isStreaming = true)
+        case other => other

Review Comment:
   Eventually in SDP we expect `source` to simply be a string identifier: 
   
https://github.com/apache/spark/blob/62e4e166698f95bcbeb16687ceaa61a3a8f14dbc/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala#L444
   
   
https://github.com/apache/spark/blob/62e4e166698f95bcbeb16687ceaa61a3a8f14dbc/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto#L162



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =
+    withOrigin(params) {
+      val sourceTable = plan(params.source) match {
+        case r: UnresolvedRelation => r.copy(isStreaming = true)
+        case other => other

Review Comment:
   Should we invalidate against all other logical plans?
   
   I don't know how this concretely maps against logical plan subclasses, but 
at the syntax level we should only be allowing source to be a multipart 
identifier, or STREAM(multipart identifier).
   
   I _think_ both of these will map directly to an `UnresolvedRelation`. And 
then can we take this a further where `AutoCdcIntoCommand` requires `source` to 
be a `TableIdentifier`/`Identifier` rather than an arbitrary `LogicalPlan`? 
Then I guess `AutoCdcIntoCommand` can go back to being a leaf plan type.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1548,22 +1549,43 @@ class SparkSqlAstBuilder extends AstBuilder {
     )
   }
 
-  override def visitCreatePipelineDataset(
-      ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
-    val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
-
-    val syntaxTypeErrorStr = if 
(createPipelineDatasetHeaderCtx.materializedView() != null) {
-      "MATERIALIZED VIEW"
-    } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
-      "STREAMING TABLE"
-    } else {
-      // Should never be possible based on grammar definition.
-      throw invalidStatement(ctx.getText, ctx)
-    }
+  override def visitCreateFlowAutoCdc(
+      ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+    val flowHeaderCtx = ctx.createPipelineFlowHeader()
+    val ident = withIdentClause(flowHeaderCtx.flowName, 
UnresolvedIdentifier(_))
+    val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec)
+    val applyChanges = buildAutoCdcIntoCommand(ctx.autoCdcCommand())
+    CreateFlowCommand(
+      name = ident,
+      flowOperation = applyChanges,
+      comment = commentOpt
+    )
+  }
 
-    val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
-    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
-    val (colDefs, colConstraints) = 
Option(ctx.tableElementList()).map(visitTableElementList)
+  /**
+   * Shared helper for pipeline dataset creation statements (CREATE STREAMING 
TABLE,
+   * CREATE MATERIALIZED VIEW, CREATE STREAMING TABLE ... FLOW AUTO CDC).
+   *
+   * Validates and extracts column definitions, partitioning, and table spec 
from the common
+   * grammar elements shared by these statements.
+   *
+   * @return (colDefs, partitioning, spec, ifNotExists, tableIdent)
+   */
+  private def parsePipelineDatasetPrelude(

Review Comment:
   Do we still need this function now that the same `createPipelineDataset` 
rule is hit for both `AS query` and `FLOW AUTO CDC` sub-rules?



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -750,6 +751,37 @@ dmlStatementNoWith
         notMatchedBySourceClause*                                              
    #mergeIntoTable
     ;
 
+autoCdcCommand
+    : AUTO CDC INTO target=multipartIdentifier
+        autoCdcParameters
+    ;
+
+autoCdcBody
+    : AUTO CDC autoCdcParameters
+    ;
+
+autoCdcParameters
+    : FROM source=relationPrimary
+        KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN

Review Comment:
   For keys and the column selection clause, we don't actually allow multipart 
identifiers as part of the AutoCDC API contract.
   
   Should we use `identifier` instead of `multipartIdentifierList` to reflect 
this? Either way SDP will invalidate multipart identifiers, but I figure we 
should make it true be construction where possible.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =

Review Comment:
   optional: Might be nice to put this in a case class before returning, and 
then `AutoCdcIntoCommand` can also just hold a reference to said case class. Or 
maybe even just directly return an `AutoCdcIntoCommand` from this function.
   
   I generally don't like returning a tuple longer than 2-3 elements, its hard 
to keep track of position <-> semantic value mentally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to