szehon-ho commented on code in PR #56042:
URL: https://github.com/apache/spark/pull/56042#discussion_r3291646331
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -194,3 +243,109 @@ class AppendOnceFlow(
override val once = true
}
+
+/**
+ * A resolved flow that applies a CDC event stream to a target table via
MERGE, in accordance to
+ * the configured [[flow.changeArgs]].
+ */
+class AutoCdcMergeFlow(
+ val flow: AutoCdcFlow,
+ val funcResult: FlowFunctionResult
+) extends ResolvedFlow {
+ requireReservedPrefixAbsentInSourceColumns()
+
+ def changeArgs: ChangeArgs = flow.changeArgs
+
+ /**
+ * Returns the augmented output schema of this flow, which can differ from
the schema of the
+ * source change-data-feed dataframe.
+ *
+ * The source dataframe's schema describes the incoming CDC events; the
augmented schema here
+ * applies the user-specified [[ColumnSelection]] and appends the
SCD-specific metadata
+ * columns that the AutoCDC MERGE engine projects onto the target table.
Downstream
+ * dependencies in the pipeline see this augmented schema.
+ */
+ override val schema: StructType = {
Review Comment:
`AutoCdcMergeFlow` overrides `schema` to the augmented target-facing schema
(column selection + CDC metadata), but still inherits `ResolvedFlow.load()`
which returns the raw CDF `df`. If any code path reads this flow as an `Input`
and assumes `load()` matches `schema`, that could be surprising.
Probably fine if AutoCDC outputs are always materialized to tables first,
but worth confirming when the execution PR wires this up — either override
`load()` or document that the augmented schema is for inference/planning only.
Non-blocking.
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import java.util.Locale
+
+import scala.util.Success
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column,
QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.graph.{
+ AutoCdcFlow,
+ AutoCdcMergeFlow,
+ FlowFunction,
+ FlowFunctionResult,
+ Input,
+ QueryContext,
+ QueryOrigin
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructField, StructType}
+
+/**
Review Comment:
Nit: this comment says the suite "does not exercise the full pipeline-graph
resolution machinery," but `ConnectValidPipelineSuite` now has `"AutoCdcFlow
registers and resolves to AutoCdcMergeFlow"`. Consider updating to something
like:
> Unit tests for [[AutoCdcFlow]] and [[AutoCdcMergeFlow]] schema/validation
at the data-class layer. End-to-end graph resolution is covered separately in
[[ConnectValidPipelineSuite]].
Non-blocking.
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import java.util.Locale
+
+import scala.util.Success
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column,
QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.graph.{
+ AutoCdcFlow,
+ AutoCdcMergeFlow,
+ FlowFunction,
+ FlowFunctionResult,
+ Input,
+ QueryContext,
+ QueryOrigin
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructField, StructType}
+
+/**
+ * Unit tests for the [[AutoCdcFlow]] data class and the augmented schema
computed by
+ * [[AutoCdcMergeFlow]]. The tests stop at the data-class / schema surface;
they do not
+ * exercise the full pipeline-graph resolution machinery (which is not yet
wired up to AutoCDC
+ * flows).
+ */
+class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
+
+ private val testIdentifier = TableIdentifier("cdc_target", Some("db"))
+
+ /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests
should never call it. */
+ private val noOpFlowFunction: FlowFunction = new FlowFunction {
+ override def call(
+ allInputs: Set[TableIdentifier],
+ availableInputs: Seq[Input],
+ configuration: Map[String, String],
+ queryContext: QueryContext,
+ queryOrigin: QueryOrigin): FlowFunctionResult =
+ throw new UnsupportedOperationException(
+ "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite
tests"
+ )
+ }
+
+ private val testQueryContext =
+ QueryContext(currentCatalog = Some("test_catalog"), currentDatabase =
Some("test_db"))
+
+ private val testChangeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+
+ private def newAutoCdcFlow(
+ identifier: TableIdentifier = testIdentifier,
+ destinationIdentifier: TableIdentifier = testIdentifier,
+ func: FlowFunction = noOpFlowFunction,
+ queryContext: QueryContext = testQueryContext,
+ sqlConf: Map[String, String] = Map.empty,
+ comment: Option[String] = None,
+ origin: QueryOrigin = QueryOrigin.empty,
+ changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
+ AutoCdcFlow(
+ identifier = identifier,
+ destinationIdentifier = destinationIdentifier,
+ func = func,
+ queryContext = queryContext,
+ sqlConf = sqlConf,
+ comment = comment,
+ origin = origin,
+ changeArgs = changeArgs
+ )
+ }
+
+ test("AutoCdcFlow exposes its constructor fields") {
+ val flow = newAutoCdcFlow(
+ sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
+ comment = Some("my CDC flow")
+ )
+
+ assert(flow.identifier == testIdentifier)
+ assert(flow.destinationIdentifier == testIdentifier)
+ assert(flow.func eq noOpFlowFunction)
+ assert(flow.queryContext == testQueryContext)
+ assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
+ assert(flow.comment.contains("my CDC flow"))
+ assert(flow.origin == QueryOrigin.empty)
+ assert(flow.changeArgs == testChangeArgs)
+ }
+
+ test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+ // Confirms the case-class default values match the documented contract;
downstream
+ // registration code relies on `sqlConf` being a non-null empty map by
default so that
+ // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in
[[GraphRegistrationContext]].
+ val flow = AutoCdcFlow(
+ identifier = testIdentifier,
+ destinationIdentifier = testIdentifier,
+ func = noOpFlowFunction,
+ queryContext = testQueryContext,
+ origin = QueryOrigin.empty,
+ changeArgs = testChangeArgs
+ )
+
+ assert(flow.sqlConf.isEmpty)
+ assert(flow.comment.isEmpty)
+ }
+
+ test("AutoCdcFlow.once is always false") {
+ // AutoCDC flows are streaming-only and must run on every batch trigger,
never as a
+ // one-shot full-refresh-style flow. Locking this in so a future refactor
doesn't
+ // accidentally make `once` configurable.
+
+ // In the future we may intentionally add [[once]] support for AutoCDC
flows, at which point
+ // this test can safely be removed.
+ val flow = newAutoCdcFlow()
+ assert(!flow.once)
+ }
+
+ test("AutoCdcFlow.withSqlConf returns a new instance with the updated
sqlConf") {
+ val original = newAutoCdcFlow(sqlConf = Map("a" -> "1"))
+ val updated = original.withSqlConf(Map("b" -> "2"))
+
+ assert(updated.sqlConf == Map("b" -> "2"))
+ // All other fields should be preserved verbatim.
+ assert(updated.identifier == original.identifier)
+ assert(updated.destinationIdentifier == original.destinationIdentifier)
+ assert(updated.func eq original.func)
+ assert(updated.queryContext == original.queryContext)
+ assert(updated.comment == original.comment)
+ assert(updated.origin == original.origin)
+ assert(updated.changeArgs == original.changeArgs)
+ // The original must not be mutated.
+ assert(original.sqlConf == Map("a" -> "1"))
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow.schema tests
+ //
===========================================================================================
+
+ /** Materializes a successful [[FlowFunctionResult]] backed by the given
source dataframe. */
+ private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult =
+ FlowFunctionResult(
+ requestedInputs = Set.empty,
+ batchInputs = Set.empty,
+ streamingInputs = Set.empty,
+ usedExternalInputs = Set.empty,
+ dataFrame = Success(sourceDf),
+ sqlConf = Map.empty
+ )
+
+ /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change
args. */
+ private def newAutoCdcMergeFlow(
+ sourceDf: DataFrame,
+ keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
+ sequencing: Column = F.col("seq"),
+ storedAsScdType: ScdType = ScdType.Type1,
+ columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = {
+ val flow = newAutoCdcFlow(
+ changeArgs = ChangeArgs(
+ keys = keys,
+ sequencing = sequencing,
+ storedAsScdType = storedAsScdType,
+ columnSelection = columnSelection
+ )
+ )
+ new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf))
+ }
+
+ /** A stable 3-column source CDF schema used across most schema tests. */
+ private def threeColumnSourceDf(): DataFrame = {
+ val schema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ /** Convenience to extract the [[StructType]] of the projected
`_cdc_metadata` column. */
+ private def cdcMetadataStruct(schema: StructType): StructType =
+
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
+ test(
+ "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when
no " +
+ "columnSelection is set"
+ ) {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+ )
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test(
+ "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved
sequencing data type"
+ ) {
+ // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so
the projected
+ // `_cdc_metadata` fields should be Int (not Long), demonstrating that the
sequencing
+ // expression's *resolved* type drives the metadata schema.
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ sequencing = F.col("seq").cast(IntegerType)
+ )
+
+ val metaStruct = cdcMetadataStruct(resolvedFlow.schema)
+ assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType))
+ }
+
+ test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with
nullable inner fields") {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+ assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
+
+ val metaStruct = metaField.dataType.asInstanceOf[StructType]
+ assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable)
+ assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable)
+ }
+
+ test("AutoCdcMergeFlow.schema is stable across reads") {
+ // The schema computation calls `df.select(sequencing).schema`, which
triggers Spark
+ // analysis. The eagerly-initialized `val` caches the result so downstream
consumers get
+ // a stable schema instance across reads.
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+ val first = resolvedFlow.schema
+ val second = resolvedFlow.schema
+ assert(first eq second, "schema should be cached as a val and return the
same instance")
+ }
+
+ test("AutoCdcMergeFlow rejects SCD2 at construction with
AUTOCDC_SCD2_NOT_SUPPORTED") {
+ // Constructing the flow forces the resolved schema, which is unsupported
for SCD2 today.
+ // Failing eagerly (rather than deferring to the first downstream `schema`
read) is the
+ // intended UX -- pipeline graph analysis should not be able to register
an SCD2 AutoCDC
+ // flow at all.
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ storedAsScdType = ScdType.Type2
+ )
+ },
+ condition = "AUTOCDC_SCD2_NOT_SUPPORTED",
+ sqlState = "0A000",
+ parameters = Map.empty
+ )
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow reserved-prefix validation tests
+ //
+ // The two "contract:" tests below lock in the high-level invariant that no
reserved-prefix
+ // column name can be referenced anywhere -- not in the source change-data
feed schema, and
+ // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together
they ensure that
+ // (a) users cannot opt out of the reserved CDC metadata column by omitting
it from the
+ // selected schema, and (b) users cannot opt in to (or out of) any other
reserved-prefix
+ // name we may reserve in the future for an internal CDC concern.
+ //
+ // The remaining tests pin down case-sensitivity nuances of the
source-schema validator.
+ //
===========================================================================================
+
+ /** Builds an empty source df with `id` + `seq` + the supplied extra
columns. */
+ private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*):
DataFrame = {
+ val schema = extraColumns.foldLeft(
+ new StructType().add("id", IntegerType, nullable = false).add("seq",
LongType)
+ ) { case (acc, (name, dt)) => acc.add(name, dt) }
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ test(
+ "Contract: a source df column with the reserved AutoCDC prefix is rejected
at flow " +
+ "construction"
+ ) {
+ val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+ val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ sqlState = "42710",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "columnName" -> conflictingName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ )
+ )
+ }
+
+ test(
Review Comment:
The contract test correctly documents that reserved-prefix names in
`ChangeArgs` fail indirectly:
- keys → `AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA`
- columnSelection → `AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA`
Behavior is correct (the name isn't in the source schema), but the error
messages won't mention the reservation. Optional polish for a later PR if we
want clearer UX — e.g. a dedicated
`AUTOCDC_RESERVED_COLUMN_NAME_IN_CHANGE_ARGS` that fires before schema lookup.
Non-blocking for this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]