jose-torres commented on code in PR #56283:
URL: https://github.com/apache/spark/pull/56283#discussion_r3371065904


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying with the 
specified
+ * [[changeArgs]] configuration.
+ *
+ * @param changeArgs The CDC flow configuration.
+ * @param resolvedSequencingType The post-analysis [[DataType]] of the 
sequencing column, derived
+ *                               from the flow's resolved DataFrame at flow 
setup time.
+ */
+case class Scd2BatchProcessor(
+    changeArgs: ChangeArgs,
+    resolvedSequencingType: DataType) {
+
+  /**
+   * Backtick-quoted key column names. Use when the name flows through an 
expression parser
+   * (e.g., [[F.col]]), which interprets dotted names as struct-field accesses.
+   */
+  private lazy val keysQuoted: Seq[String] = changeArgs.keys.map(_.quoted)
+
+  /**
+   * Raw key column names. Use when the name is matched literally against a 
schema field
+   * (e.g., DataFrame `.join(other, usingColumns)`), where backticks are NOT 
stripped.
+   */
+  private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name)
+
+  /**
+   * Reconcile a CDC microbatch into the canonical form the auxiliary- and 
target-table merges
+   * consume.
+   *
+   * Step ordering is load-bearing: the row-extension steps reference user 
data columns that
+   * target-column selection is allowed to drop, so selection runs last. 
Unlike SCD1, no per-key
+   * deduplication step is performed here - SCD2 preserves every event as part 
of the row's
+   * history, including byte-identical full-event duplicates.
+   *
+   * Duplicate event elimination (e.g., collapsing two identical events at the 
same sequence),
+   * whether across microbatches or within the same microbatch, is the 
responsibility of
+   * downstream reconciliation - not preprocessing.
+   *
+   * @param microbatchDf
+   *   the incoming CDC microbatch.
+   * @return
+   *   a dataframe that retains every input row 1:1 - no rows added, dropped, 
reordered, or
+   *   merged - with the following schema, in column order:
+   *     1. The user columns of `microbatchDf` that survive 
[[ChangeArgs.columnSelection]], in
+   *        the order they appeared in the input.
+   *     2. [[startAtColName]], populated with the sequence value of the row.
+   *     3. [[endAtColName]], populated with the sequence value of the row IFF 
it's a delete
+   *        event, null otherwise.
+   *     4. [[cdcMetadataColName]], conforming to [[cdcMetadataColSchema]].
+   */
+  private[autocdc] def preprocessMicrobatch(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf
+      .transform(extendMicrobatchRowsWithStartAt)
+      .transform(extendMicrobatchRowsWithEndAt)
+      .transform(extendMicrobatchRowsWithCdcMetadata)
+      .transform(projectTargetColumnsOntoMicrobatch)
+  }
+
+  /**
+   * Stamp each microbatch row with its currently known start-at (i.e 
active-from) using its
+   * sequencing.
+   */
+  private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = Scd2BatchProcessor.startAtColName,
+      col = changeArgs.sequencing.cast(resolvedSequencingType)
+    )
+  }
+
+  /**
+   * Stamp each microbatch delete event row with its end time sequence, as 
they are instantaneous
+   * events.
+   *
+   * Non-deletes leave a null end, as we do not yet know if the row represents 
an active upsert,
+   * or a closed upsert. This will become clear in later reconciliation 
against the aux/target
+   * tables.
+   */
+  private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = Scd2BatchProcessor.endAtColName,
+      col = (
+        changeArgs.deleteCondition match {
+          case Some(deleteCondition) =>
+            F.when(deleteCondition, 
changeArgs.sequencing).otherwise(F.lit(null))
+          case None =>
+            F.lit(null)
+        }
+      ).cast(resolvedSequencingType)
+    )
+  }
+
+  /**
+   * Project the operational CDC metadata column carrying the literal event 
sequence. Downstream
+   * merges rely on it to preserve original event lineage regardless of how 
rows start/end-at are
+   * coalesced.
+   */
+  private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = AutoCdcReservedNames.cdcMetadataColName,
+      col = Scd2BatchProcessor.constructCdcMetadataCol(
+        recordStartAt = changeArgs.sequencing,
+        sequencingType = resolvedSequencingType
+      )
+    )
+  }
+
+  /**
+   * Apply the user's target column selection while preserving the SCD2 
framework columns; the
+   * latter are required by downstream merges and persisted to both the 
auxiliary and target
+   * tables, so users cannot deselect them.
+   *
+   * Requires the framework columns to already be present on the input.
+   */
+  private def projectTargetColumnsOntoMicrobatch(
+      microbatch: DataFrame
+  ): DataFrame = {
+    val caseSensitive = 
microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+    // Strip the framework columns through the same case-aware path as the 
user selection, for
+    // consistency with Scd1BatchProcessor.projectTargetColumnsOntoMicrobatch.
+    val dataSchema = ColumnSelection.applyToSchema(
+      schemaName = "microbatch",
+      schema = microbatch.schema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(
+          
Scd2BatchProcessor.reservedFrameworkColNames.toSeq.map(UnqualifiedColumnName(_))
+        )
+      ),
+      caseSensitive = caseSensitive
+    )
+    val userSelectedDataSchema =
+      ColumnSelection.applyToSchema(
+        schemaName = "microbatch",
+        schema = dataSchema,
+        columnSelection = changeArgs.columnSelection,
+        caseSensitive = caseSensitive
+      )
+    val finalColumnsToSelect: Seq[Column] =
+      userSelectedDataSchema.fieldNames.toSeq.map(colName => {
+        // Spark drops backticks in the schema, quote all identifiers for 
safety before executing
+        // select. Identifiers could have special characters such as '.'.
+        F.col(QuotingUtils.quoteIdentifier(colName))
+      }) ++ Seq(
+        F.col(Scd2BatchProcessor.startAtColName),
+        F.col(Scd2BatchProcessor.endAtColName),
+        F.col(AutoCdcReservedNames.cdcMetadataColName)
+      )
+    microbatch.select(finalColumnsToSelect: _*)
+  }
+
+  /**
+   * For each key in the preprocessed microbatch, compute the earliest 
[[recordStartAtFieldName]]
+   * across the key's events.
+   *
+   * @param preprocessedBatchDf
+   *   a validated and preprocessed microbatch as produced by 
[[preprocessMicrobatch]] - in
+   *   particular, non-null key columns and a non-null 
[[recordStartAtFieldName]] on every row.
+   * @return
+   *   a dataframe containing one row per distinct key. Schema, in column 
order:
+   *     1. The key columns ([[ChangeArgs.keys]]), in their declared order.
+   *     2. [[minSequenceColName]], carrying the min [[recordStartAtFieldName]]
+   *        across all records within the microbatch for that key.
+   */
+  private[autocdc] def computeMinimumSequencePerKey(preprocessedBatchDf: 
DataFrame): DataFrame = {
+    val recordStartAt =
+      
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+    preprocessedBatchDf
+      .groupBy(keysQuoted.map(F.col): _*)
+      .agg(F.min(recordStartAt).alias(Scd2BatchProcessor.minSequenceColName))
+  }
+
+  /**
+   * Find the auxiliary-table rows whose state matters for reconciling the 
microbatch.
+   *
+   * @param rawAuxiliaryTableDf
+   *   the auxiliary table in its native schema, which is expected to contain
+   *   [[deletedByBatchIdColName]] in addition to all of the columns in the 
target table.
+   * @param perKeyMinimumSequenceInMicrobatchDf
+   *   one row per distinct key as produced by 
[[computeMinimumSequencePerKey]], representing
+   *   the minimum sequence for that key in the microbatch.
+   * @param batchId
+   *   the underlying Spark streaming query's batchId, which serves as the 
idempotency key.
+   * @return
+   *   a dataframe containing all the affected aux rows, with the aux-only
+   *   [[deletedByBatchIdColName]] column dropped so the result is 
union-compatible with
+   *   preprocessed microbatch rows and target-table rows downstream.
+   */
+  private[autocdc] def findAffectedRowsFromAuxiliaryTable(
+      rawAuxiliaryTableDf: DataFrame,
+      perKeyMinimumSequenceInMicrobatchDf: DataFrame,
+      batchId: Long
+  ): DataFrame = {
+    val auxTableRecordStartAtField = Scd2BatchProcessor.recordStartAtOf(
+      F.col(AutoCdcReservedNames.cdcMetadataColName)
+    )
+    val auxTableDeletedByBatchIdCol = 
F.col(Scd2BatchProcessor.deletedByBatchIdColName)
+
+    val reducedAuxiliaryTableDf = rawAuxiliaryTableDf
+      .filter(
+        // Ignore any auxiliary table rows logically deleted by any microbatch 
other than this one
+        // itself. Recall this execution could be a retry attempt on the same 
microbatch, and
+        // batchId is our idempotency key.
+        auxTableDeletedByBatchIdCol.isNull ||
+          auxTableDeletedByBatchIdCol === F.lit(batchId)
+      )
+      // Drop the aux-only idempotency column so the output schema matches 
target-table rows
+      // and preprocessed-microbatch rows (which share the same canonical SCD2 
row schema).
+      .drop(Scd2BatchProcessor.deletedByBatchIdColName)
+
+    val perKeyMinimumSequenceInMicrobatchCol = 
F.col(Scd2BatchProcessor.minSequenceColName)
+
+    // Per key, identify the sequence value associated with the anchor row in 
the aux table.
+    //
+    // The anchor row is the aux row with the largest 
[[recordStartAtFieldName]] strictly less
+    // than the min sequence in the incoming microbatch for that key. The 
reconciler needs this
+    // "left context" in two cases:
+    //   (1) Incoming no-op upsert: without the anchor, it would look like a 
new run head, when in
+    //       reality it's a part of an existing no-op run/head.
+    //   (2) Incoming state-changing upsert that bisects two aux no-ops: the 
anchor surfaces
+    //       the before-half so both halves can be promoted to target. (The 
after-half is
+    //       picked up by the >= minSeq branch.)
+    //
+    // Because no-op upserts are stored only in the aux table, the anchor 
concept only exists when
+    // pulling in rows from the aux table, and is not relevant for the target 
table.
+    //
+    // Keys with no aux row strictly before the min sequence have no anchor; 
their affected set
+    // reduces to "all aux rows at or after the min sequence."
+    //
+    // The shape of this DataFrame is: [key1, key2, ... keyN, anchorSequence]
+    val perKeyAnchorSequenceDf = reducedAuxiliaryTableDf
+      // The number of rows in [[perKeyMinimumSequenceInMicrobatchDf]] is 
bounded by the
+      // number of unique keys in the microbatch, which should typically be 
small. The
+      // auxiliary table should generally also be small, containing only no-op 
upsert runs
+      // and tombstones per key. Therefore this join should be cheap, and 
broadcast joinable.
+      .join(perKeyMinimumSequenceInMicrobatchDf, keysRaw)
+      .filter(auxTableRecordStartAtField < 
perKeyMinimumSequenceInMicrobatchCol)
+      .groupBy(keysQuoted.map(F.col): _*)
+      .agg(
+        
F.max(auxTableRecordStartAtField).as(Scd2BatchProcessor.anchorSequenceColName)
+      )
+    val anchorSequenceCol = F.col(Scd2BatchProcessor.anchorSequenceColName)
+    val auxRowIsAnchorRow = auxTableRecordStartAtField === anchorSequenceCol
+
+    // Now that we have the minimum sequence in the microbatch and the 
sequence of the anchor row,
+    // we have enough information to compute the full set of auxiliary rows 
that affect or are
+    // affected by the microbatch.
+    val auxRowIsAtOrAfterMinSequenceInMicrobatch =
+      auxTableRecordStartAtField >= perKeyMinimumSequenceInMicrobatchCol
+
+    val auxRowAffectsMicrobatch = auxRowIsAtOrAfterMinSequenceInMicrobatch || 
auxRowIsAnchorRow
+
+    val affectedRowsFromAuxiliaryTable = reducedAuxiliaryTableDf
+      // Per row, project the minimum microbatch sequence and anchor sequence 
for that row's key
+      // set onto the row, so the affected-row predicate can be evaluated in a 
single filter.
+      .join(perKeyMinimumSequenceInMicrobatchDf, keysRaw)
+      .join(
+        perKeyAnchorSequenceDf,
+        keysRaw,
+        joinType = "left"
+      )
+      .filter(auxRowAffectsMicrobatch)
+      .drop(perKeyMinimumSequenceInMicrobatchCol, anchorSequenceCol)
+
+    affectedRowsFromAuxiliaryTable
+  }
+
+  /**
+   * Find the target-table rows whose state matters for reconciling the 
microbatch.
+   *
+   * @param targetTableDf
+   *   the target table in its native schema.
+   * @param perKeyMinimumSequenceInMicrobatchDf
+   *   one row per distinct key as produced by 
[[computeMinimumSequencePerKey]], representing
+   *   the minimum sequence for that key in the microbatch.
+   * @return
+   *   a dataframe containing the affected target rows, with all columns 
passed-through.
+   */
+  private[autocdc] def findAffectedRowsFromTargetTable(
+      targetTableDf: DataFrame,
+      perKeyMinimumSequenceInMicrobatchDf: DataFrame
+  ): DataFrame = {
+    val targetEndAtCol = F.col(Scd2BatchProcessor.endAtColName)
+    val perKeyMinimumSequenceInMicrobatchCol = 
F.col(Scd2BatchProcessor.minSequenceColName)
+
+    // Per key, identify all the rows in the target table that may be affected 
by the
+    // incoming microbatch.
+    //
+    // Unlike the auxiliary table, the target table holds visible rows only: 
no hidden open
+    // no-op upsert rows, no tombstones. Visible rows for a given key form a 
non-overlapping
+    // interval partition over the sequencing axis, and at most one row has a 
null [[endAtColName]]
+    // (the currently active row per key).
+    //
+    // Hence we can simply grab all rows that were active at some point after 
the min sequencing
+    // per key, which can be determined entirely by the row's [[endAtColName]].
+    val isCurrentlyActiveRow = targetEndAtCol.isNull
+
+    // `>=` (rather than strict `>`) additionally pulls in the row that closes 
exactly at the

Review Comment:
   Ok. I'm a bit torn on this, because ideally it'd be best to avoid these 
action at a distance assumptions, but I don't see a way to achieve that without 
adding extra pointless (and potentially costly) structure to the query just to 
check for what's ultimately an edge case of an edge case. I suppose ultimately 
the fuzz testing would catch any violation of this; when we have more of the 
implementation, I would also consider adding a test for ingestion against a 
target table that does have overlapping records just to capture the behavior in 
case something crazy happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to