AnishMahto commented on code in PR #56042: URL: https://github.com/apache/spark/pull/56042#discussion_r3293578525
########## sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala: ########## @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import java.util.Locale + +import scala.util.Success + +import org.apache.spark.sql.{functions => F, AnalysisException, Column, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.pipelines.graph.{ + AutoCdcFlow, + AutoCdcMergeFlow, + FlowFunction, + FlowFunctionResult, + Input, + QueryContext, + QueryOrigin +} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructField, StructType} + +/** + * Unit tests for the [[AutoCdcFlow]] data class and the augmented schema computed by + * [[AutoCdcMergeFlow]]. The tests stop at the data-class / schema surface; they do not + * exercise the full pipeline-graph resolution machinery (which is not yet wired up to AutoCDC + * flows). + */ +class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { + + private val testIdentifier = TableIdentifier("cdc_target", Some("db")) + + /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests should never call it. */ + private val noOpFlowFunction: FlowFunction = new FlowFunction { + override def call( + allInputs: Set[TableIdentifier], + availableInputs: Seq[Input], + configuration: Map[String, String], + queryContext: QueryContext, + queryOrigin: QueryOrigin): FlowFunctionResult = + throw new UnsupportedOperationException( + "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite tests" + ) + } + + private val testQueryContext = + QueryContext(currentCatalog = Some("test_catalog"), currentDatabase = Some("test_db")) + + private val testChangeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + + private def newAutoCdcFlow( + identifier: TableIdentifier = testIdentifier, + destinationIdentifier: TableIdentifier = testIdentifier, + func: FlowFunction = noOpFlowFunction, + queryContext: QueryContext = testQueryContext, + sqlConf: Map[String, String] = Map.empty, + comment: Option[String] = None, + origin: QueryOrigin = QueryOrigin.empty, + changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = { + AutoCdcFlow( + identifier = identifier, + destinationIdentifier = destinationIdentifier, + func = func, + queryContext = queryContext, + sqlConf = sqlConf, + comment = comment, + origin = origin, + changeArgs = changeArgs + ) + } + + test("AutoCdcFlow exposes its constructor fields") { + val flow = newAutoCdcFlow( + sqlConf = Map("spark.sql.shuffle.partitions" -> "8"), + comment = Some("my CDC flow") + ) + + assert(flow.identifier == testIdentifier) + assert(flow.destinationIdentifier == testIdentifier) + assert(flow.func eq noOpFlowFunction) + assert(flow.queryContext == testQueryContext) + assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8")) + assert(flow.comment.contains("my CDC flow")) + assert(flow.origin == QueryOrigin.empty) + assert(flow.changeArgs == testChangeArgs) + } + + test("AutoCdcFlow defaults sqlConf to empty and comment to None") { + // Confirms the case-class default values match the documented contract; downstream + // registration code relies on `sqlConf` being a non-null empty map by default so that + // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in [[GraphRegistrationContext]]. + val flow = AutoCdcFlow( + identifier = testIdentifier, + destinationIdentifier = testIdentifier, + func = noOpFlowFunction, + queryContext = testQueryContext, + origin = QueryOrigin.empty, + changeArgs = testChangeArgs + ) + + assert(flow.sqlConf.isEmpty) + assert(flow.comment.isEmpty) + } + + test("AutoCdcFlow.once is always false") { + // AutoCDC flows are streaming-only and must run on every batch trigger, never as a + // one-shot full-refresh-style flow. Locking this in so a future refactor doesn't + // accidentally make `once` configurable. + + // In the future we may intentionally add [[once]] support for AutoCDC flows, at which point + // this test can safely be removed. + val flow = newAutoCdcFlow() + assert(!flow.once) + } + + test("AutoCdcFlow.withSqlConf returns a new instance with the updated sqlConf") { + val original = newAutoCdcFlow(sqlConf = Map("a" -> "1")) + val updated = original.withSqlConf(Map("b" -> "2")) + + assert(updated.sqlConf == Map("b" -> "2")) + // All other fields should be preserved verbatim. + assert(updated.identifier == original.identifier) + assert(updated.destinationIdentifier == original.destinationIdentifier) + assert(updated.func eq original.func) + assert(updated.queryContext == original.queryContext) + assert(updated.comment == original.comment) + assert(updated.origin == original.origin) + assert(updated.changeArgs == original.changeArgs) + // The original must not be mutated. + assert(original.sqlConf == Map("a" -> "1")) + } + + // =========================================================================================== + // AutoCdcMergeFlow.schema tests + // =========================================================================================== + + /** Materializes a successful [[FlowFunctionResult]] backed by the given source dataframe. */ + private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult = + FlowFunctionResult( + requestedInputs = Set.empty, + batchInputs = Set.empty, + streamingInputs = Set.empty, + usedExternalInputs = Set.empty, + dataFrame = Success(sourceDf), + sqlConf = Map.empty + ) + + /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change args. */ + private def newAutoCdcMergeFlow( + sourceDf: DataFrame, + keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")), + sequencing: Column = F.col("seq"), + storedAsScdType: ScdType = ScdType.Type1, + columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = { + val flow = newAutoCdcFlow( + changeArgs = ChangeArgs( + keys = keys, + sequencing = sequencing, + storedAsScdType = storedAsScdType, + columnSelection = columnSelection + ) + ) + new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf)) + } + + /** A stable 3-column source CDF schema used across most schema tests. */ + private def threeColumnSourceDf(): DataFrame = { + val schema = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType) + .add("seq", LongType) + spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], schema) + } + + /** Convenience to extract the [[StructType]] of the projected `_cdc_metadata` column. */ + private def cdcMetadataStruct(schema: StructType): StructType = + schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + + test( + "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when no " + + "columnSelection is set" + ) { + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") { + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq")) + ) + ) + ) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") { + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ) + ) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test( + "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved sequencing data type" + ) { + // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so the projected + // `_cdc_metadata` fields should be Int (not Long), demonstrating that the sequencing + // expression's *resolved* type drives the metadata schema. + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + sequencing = F.col("seq").cast(IntegerType) + ) + + val metaStruct = cdcMetadataStruct(resolvedFlow.schema) + assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType)) + } + + test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with nullable inner fields") { + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + + val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName) + assert(!metaField.nullable, "_cdc_metadata column itself must be non-null") + + val metaStruct = metaField.dataType.asInstanceOf[StructType] + assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable) + assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable) + } + + test("AutoCdcMergeFlow.schema is stable across reads") { + // The schema computation calls `df.select(sequencing).schema`, which triggers Spark + // analysis. The eagerly-initialized `val` caches the result so downstream consumers get + // a stable schema instance across reads. + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + val first = resolvedFlow.schema + val second = resolvedFlow.schema + assert(first eq second, "schema should be cached as a val and return the same instance") + } + + test("AutoCdcMergeFlow rejects SCD2 at construction with AUTOCDC_SCD2_NOT_SUPPORTED") { + // Constructing the flow forces the resolved schema, which is unsupported for SCD2 today. + // Failing eagerly (rather than deferring to the first downstream `schema` read) is the + // intended UX -- pipeline graph analysis should not be able to register an SCD2 AutoCDC + // flow at all. + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + storedAsScdType = ScdType.Type2 + ) + }, + condition = "AUTOCDC_SCD2_NOT_SUPPORTED", + sqlState = "0A000", + parameters = Map.empty + ) + } + + // =========================================================================================== + // AutoCdcMergeFlow reserved-prefix validation tests + // + // The two "contract:" tests below lock in the high-level invariant that no reserved-prefix + // column name can be referenced anywhere -- not in the source change-data feed schema, and + // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together they ensure that + // (a) users cannot opt out of the reserved CDC metadata column by omitting it from the + // selected schema, and (b) users cannot opt in to (or out of) any other reserved-prefix + // name we may reserve in the future for an internal CDC concern. + // + // The remaining tests pin down case-sensitivity nuances of the source-schema validator. + // =========================================================================================== + + /** Builds an empty source df with `id` + `seq` + the supplied extra columns. */ + private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*): DataFrame = { + val schema = extraColumns.foldLeft( + new StructType().add("id", IntegerType, nullable = false).add("seq", LongType) + ) { case (acc, (name, dt)) => acc.add(name, dt) } + spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], schema) + } + + test( + "Contract: a source df column with the reserved AutoCDC prefix is rejected at flow " + + "construction" + ) { + val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + ) + ) + } + + test( Review Comment: The existing UX is fine here IMO, the error condition thrown already here is equally valid and meaningful to throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
