AnishMahto commented on code in PR #56283: URL: https://github.com/apache/spark/pull/56283#discussion_r3364380570
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala: ########## @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying with the specified + * [[changeArgs]] configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. + */ +case class Scd2BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + + /** + * Backtick-quoted key column names. Use when the name flows through an expression parser + * (e.g., [[F.col]]), which interprets dotted names as struct-field accesses. + */ + private lazy val keysQuoted: Seq[String] = changeArgs.keys.map(_.quoted) + + /** + * Raw key column names. Use when the name is matched literally against a schema field + * (e.g., DataFrame `.join(other, usingColumns)`), where backticks are NOT stripped. + */ + private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name) + + /** + * Reconcile a CDC microbatch into the canonical form the auxiliary- and target-table merges + * consume. + * + * Step ordering is load-bearing: the row-extension steps reference user data columns that + * target-column selection is allowed to drop, so selection runs last. Unlike SCD1, no per-key + * deduplication step is performed here - SCD2 preserves every event as part of the row's + * history, including byte-identical full-event duplicates. + * + * Duplicate event elimination (e.g., collapsing two identical events at the same sequence), + * whether across microbatches or within the same microbatch, is the responsibility of + * downstream reconciliation - not preprocessing. + * + * @param microbatchDf + * the incoming CDC microbatch. + * @return + * a dataframe that retains every input row 1:1 - no rows added, dropped, reordered, or + * merged - with the following schema, in column order: + * 1. The user columns of `microbatchDf` that survive [[ChangeArgs.columnSelection]], in + * the order they appeared in the input. + * 2. [[startAtColName]], populated with the sequence value of the row. + * 3. [[endAtColName]], populated with the sequence value of the row IFF it's a delete + * event, null otherwise. + * 4. [[cdcMetadataColName]], conforming to [[cdcMetadataColSchema]]. + */ + private[autocdc] def preprocessMicrobatch(microbatchDf: DataFrame): DataFrame = { + microbatchDf + .transform(extendMicrobatchRowsWithStartAt) + .transform(extendMicrobatchRowsWithEndAt) + .transform(extendMicrobatchRowsWithCdcMetadata) + .transform(projectTargetColumnsOntoMicrobatch) + } + + /** + * Stamp each microbatch row with its currently known start-at (i.e active-from) using its + * sequencing. + */ + private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.startAtColName, + col = changeArgs.sequencing.cast(resolvedSequencingType) + ) + } + + /** + * Stamp each microbatch delete event row with its end time sequence, as they are instantaneous + * events. + * + * Non-deletes leave a null end, as we do not yet know if the row represents an active upsert, + * or a closed upsert. This will become clear in later reconciliation against the aux/target + * tables. + */ + private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.endAtColName, + col = ( + changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) + case None => + F.lit(null) + } + ).cast(resolvedSequencingType) + ) + } + + /** + * Project the operational CDC metadata column carrying the literal event sequence. Downstream + * merges rely on it to preserve original event lineage regardless of how rows start/end-at are + * coalesced. + */ + private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = AutoCdcReservedNames.cdcMetadataColName, + col = Scd2BatchProcessor.constructCdcMetadataCol( + recordStartAt = changeArgs.sequencing, + sequencingType = resolvedSequencingType + ) + ) + } + + /** + * Apply the user's target column selection while preserving the SCD2 framework columns; the + * latter are required by downstream merges and persisted to both the auxiliary and target + * tables, so users cannot deselect them. + * + * Requires the framework columns to already be present on the input. + */ + private def projectTargetColumnsOntoMicrobatch( + microbatch: DataFrame + ): DataFrame = { + val caseSensitive = microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis + + // Strip the framework columns through the same case-aware path as the user selection, for + // consistency with Scd1BatchProcessor.projectTargetColumnsOntoMicrobatch. + val dataSchema = ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = microbatch.schema, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Scd2BatchProcessor.reservedFrameworkColNames.toSeq.map(UnqualifiedColumnName(_)) + ) + ), + caseSensitive = caseSensitive + ) + val userSelectedDataSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = dataSchema, + columnSelection = changeArgs.columnSelection, + caseSensitive = caseSensitive + ) + val finalColumnsToSelect: Seq[Column] = + userSelectedDataSchema.fieldNames.toSeq.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) ++ Seq( + F.col(Scd2BatchProcessor.startAtColName), + F.col(Scd2BatchProcessor.endAtColName), + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + microbatch.select(finalColumnsToSelect: _*) + } + + /** + * For each key in the preprocessed microbatch, compute the earliest [[recordStartAtFieldName]] + * across the key's events. + * + * @param preprocessedBatchDf + * a validated and preprocessed microbatch as produced by [[preprocessMicrobatch]] - in + * particular, non-null key columns and a non-null [[recordStartAtFieldName]] on every row. + * @return + * a dataframe containing one row per distinct key. Schema, in column order: + * 1. The key columns ([[ChangeArgs.keys]]), in their declared order. + * 2. [[minSequenceColName]], carrying the min [[recordStartAtFieldName]] + * across all records within the microbatch for that key. + */ + private[autocdc] def computeMinimumSequencePerKey(preprocessedBatchDf: DataFrame): DataFrame = { + val recordStartAt = + Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName)) + preprocessedBatchDf + .groupBy(keysQuoted.map(F.col): _*) + .agg(F.min(recordStartAt).alias(Scd2BatchProcessor.minSequenceColName)) + } + + /** + * Find the auxiliary-table rows whose state matters for reconciling the microbatch. + * + * @param rawAuxiliaryTableDf + * the auxiliary table in its native schema, which is expected to contain + * [[deletedByBatchIdColName]] in addition to all of the columns in the target table. + * @param perKeyMinimumSequenceInMicrobatchDf + * one row per distinct key as produced by [[computeMinimumSequencePerKey]], representing + * the minimum sequence for that key in the microbatch. + * @param batchId + * the underlying Spark streaming query's batchId, which serves as the idempotency key. + * @return + * a dataframe containing all the affected aux rows, with the aux-only + * [[deletedByBatchIdColName]] column dropped so the result is union-compatible with + * preprocessed microbatch rows and target-table rows downstream. + */ + private[autocdc] def findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf: DataFrame, + perKeyMinimumSequenceInMicrobatchDf: DataFrame, + batchId: Long + ): DataFrame = { + val auxTableRecordStartAtField = Scd2BatchProcessor.recordStartAtOf( + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + val auxTableDeletedByBatchIdCol = F.col(Scd2BatchProcessor.deletedByBatchIdColName) + + val reducedAuxiliaryTableDf = rawAuxiliaryTableDf + .filter( + // Ignore any auxiliary table rows logically deleted by any microbatch other than this one Review Comment: So I think idempotency key might be the wrong terminology, I reworded the comment - let me know if it makes more sense now. Hopefully its more clear now, but we are checking the `deletedByBatchId` for all auxiliary rows. Btw `deletedByBatchId` here doesn't have anything to do with deletion CDC events specifically, delete in this context means a row has been logically deleted from the table via MERGE. In the aux table, there are both tombstones (deletion events) and no-op upserts; both can be deleted via MERGE in later reconciliation, at which point they will first be soft/logically deleted (non-null `deletedByBatchId`) before they can eventually [and safely] be hard deleted in a future microbatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
