jose-torres commented on code in PR #56311:
URL: https://github.com/apache/spark/pull/56311#discussion_r3462843843


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -547,3 +879,82 @@ object Scd2BatchProcessor {
     F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
   }
 }
+
+/**
+ * The three columns that locate a row on the SCD2 timeline: its source 
record-start sequence
+ * (`recordStartAt`, null only for decomposition tails) and the bounds of its 
visible interval
+ * [`startAt`, `endAt`). [[RowClassifier]] classifies a row purely from this 
triple.
+ */
+private[autocdc] case class Scd2IntervalColumns(
+    recordStartAt: Column,
+    startAt: Column,
+    endAt: Column) {
+
+  /**
+   * The row's effective ordering position. Decomposition tails carry no 
`recordStartAt` and
+   * fall back to their closing sequence (`endAt`), the same convention used by
+   * [[Scd2BatchProcessor.orderChronologicallyPerKeyWindow]].
+   */
+  def effectiveRecordStartAt: Column = F.coalesce(recordStartAt, endAt)
+}
+
+object RowClassifier {
+
+  /**
+   * Synthetic right boundary created by splitting a closed row, temporarily 
present during
+   * microbatch reconciliation but never materializes in the target or aux 
tables.
+   */
+  private[autocdc] def isDecompositionTail(row: Scd2IntervalColumns): Column =

Review Comment:
   Please try converting this into an unapply() case match. I'm not 100% sure 
that it will work, but my instinct is that it will be much more maintainable 
that way; right now there's a quite complex implicit invariant that the methods 
of this classifier cover all the cases and don't overlap.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -547,3 +879,82 @@ object Scd2BatchProcessor {
     F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
   }
 }
+
+/**
+ * The three columns that locate a row on the SCD2 timeline: its source 
record-start sequence
+ * (`recordStartAt`, null only for decomposition tails) and the bounds of its 
visible interval
+ * [`startAt`, `endAt`). [[RowClassifier]] classifies a row purely from this 
triple.
+ */
+private[autocdc] case class Scd2IntervalColumns(
+    recordStartAt: Column,
+    startAt: Column,
+    endAt: Column) {
+
+  /**
+   * The row's effective ordering position. Decomposition tails carry no 
`recordStartAt` and
+   * fall back to their closing sequence (`endAt`), the same convention used by
+   * [[Scd2BatchProcessor.orderChronologicallyPerKeyWindow]].
+   */
+  def effectiveRecordStartAt: Column = F.coalesce(recordStartAt, endAt)
+}
+
+object RowClassifier {
+
+  /**
+   * Synthetic right boundary created by splitting a closed row, temporarily 
present during
+   * microbatch reconciliation but never materializes in the target or aux 
tables.
+   */
+  private[autocdc] def isDecompositionTail(row: Scd2IntervalColumns): Column =
+    row.recordStartAt.isNull && row.startAt.isNull && row.endAt.isNotNull
+
+  /**
+   * Upsert row that is currently open in the visible timeline. Hidden no-op 
upserts are
+   * also open until reconciliation decides whether they should stay hidden.
+   */
+  private[autocdc] def isOpenUpsert(row: Scd2IntervalColumns): Column =
+    row.recordStartAt.isNotNull &&
+      row.startAt.isNotNull &&
+      row.endAt.isNull &&
+      // startAt < recordStartAt implies this row belongs to but is not the 
head of some
+      // upsert-run, else this is the head of a run.
+      row.startAt <= row.recordStartAt
+
+  /**
+   * Upsert row whose visible interval has already been closed by a strictly 
later event;
+   * the historical counterpart to [[isOpenUpsert]].
+   *
+   * Notably, a zero-width [startAt, endAt) interval is not considered a valid 
closed upsert.
+   */
+  private[autocdc] def isClosedUpsert(row: Scd2IntervalColumns): Column =
+    row.recordStartAt.isNotNull &&
+      row.startAt.isNotNull &&
+      row.endAt.isNotNull &&
+      row.recordStartAt < row.endAt &&
+      row.startAt < row.endAt &&
+      // startAt <= recordStartAt covers both the run-head case (startAt == 
recordStartAt)
+      // and the no-op-continuation case (startAt < recordStartAt).
+      row.startAt <= row.recordStartAt
+
+  /**
+   * Any row that semantically encodes an upsert event.
+   */
+  private[autocdc] def isUpsertRepresentingRow(row: Scd2IntervalColumns): 
Column =
+    isOpenUpsert(row) || isClosedUpsert(row)
+
+  /**
+   * Tombstone (delete-boundary) row, encoded as an instantaneous interval at
+   * `recordStartAt`. Never materializes in the target table, only in the aux 
table.
+   *
+   * User-data column values on tombstones are not part of the SCD2 contract: 
they may
+   * reflect the originating delete event, the values of the upsert whose 
closed-interval
+   * row was bisected (when the tombstone was promoted from a decomposition 
tail), or be
+   * null altogether. Reconciliation does not consume these values for any 
semantic
+   * decision.
+   */
+  private[autocdc] def isTombstone(row: Scd2IntervalColumns): Column =

Review Comment:
   I worry about semantic issues with calling this a "tombstone", since the 
natural meaning of "tombstone" (this row used to exist but now was deleted) is 
in fact represented by a closed upsert that just doesn't happen to have a 
followup. Perhaps "instantaneous delete"?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -340,6 +404,248 @@ case class Scd2BatchProcessor(
 
     affectedRowsFromTargetTable
   }
+
+  /**
+   * For every closed non-tombstone row in the input dataframe whose immediate 
window-order
+   * successor (in the same per-key partition, per 
[[orderChronologicallyPerKeyWindow]])
+   * has `recordStartAt` strictly less than its `endAt` (in other words the 
row is being
+   * bisected by its neighbor), replace that row by a "head" + "tail" pair:
+   *   - head: copies the parent row exactly, except [[endAtColName]] is set 
to null.
+   *   - tail: copies the parent row exactly, except [[startAtColName]] is set 
to null and
+   *     [[recordStartAtFieldName]] inside [[cdcMetadataColName]] is set to 
null. All user
+   *     data columns are inherited from the parent as-is.
+   *
+   * Decomposition tails are uniquely identified by [[recordStartAtFieldName]] 
= null and
+   * are temporary: downstream reconciliation drops them when a coincident 
non-tail row
+   * already represents the same closure, or promotes them to tombstones in 
the aux table
+   * otherwise.
+   *
+   * All other input rows pass through unchanged ("no-op decompose"). This 
includes:
+   *   1. Open rows ([[endAtColName]] = null): incoming upserts, no-op 
continuations, etc.
+   *   2. Tombstones ([[startAtColName]] = [[endAtColName]]): protected from 
decomposition
+   *      even though they qualify as "closed" in the broader sense, because 
their interval
+   *      is degenerate.
+   *   3. Closed non-tombstone rows whose successor's 
[[recordStartAtFieldName]] is `>=`
+   *      this row's [[endAtColName]]: the closing event already coincides 
with or follows
+   *      the run boundary, so there is nothing to bisect.
+   *
+   * Bisection detection is implemented via `LEAD(1)` over 
[[orderChronologicallyPerKeyWindow]]:
+   * because the window orders by effective recordStartAt ascending, examining 
only the
+   * immediate successor is sufficient to decide whether any other row in the 
partition has
+   * a recordStartAt within `[recordStartAt, endAt)`.
+   *
+   * Decomposing closed rows that are being bisected gives us that chance to 
form new
+   * closed intervals using the incoming microbatch events, later in 
reconciliation.
+   *
+   * @param rowsToDecomposePerKey
+   *   a dataframe conforming to the canonical SCD2 row schema
+   *   `[user_cols..., [[startAtColName]], [[endAtColName]], 
[[cdcMetadataColName]]]`, where
+   *   [[cdcMetadataColName]] conforms to [[cdcMetadataColSchema]]. 
Decomposition tails
+   *   (rows with [[recordStartAtFieldName]] = null) MUST NOT be present on 
input - they are
+   *   produced exclusively by this function.
+   * @return
+   *   a dataframe with the same schema as the input. Every closed 
non-tombstone row that
+   *   was bisected has been replaced by its head + tail pair; every other row 
is carried
+   *   through as-is. Each output row can be classified as one of: 
{decomposition head,
+   *   decomposition tail, tombstone, open upsert, closed-and-unbisected row}. 
It's possible
+   *   that some of the returned decomposition tails are logically redundant, 
as deletion
+   *   markers that are immediately overtaken by a succeeding row.
+   */
+  private[autocdc] def decomposeOutOfOrderRows(rowsToDecomposePerKey: 
DataFrame): DataFrame = {
+    val recordStartAtField =
+      
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+    val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+    val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+    val nextRecordStartAt = F.col(Scd2BatchProcessor.nextRecordStartAtColName)
+
+    // Track the next (in sorted order) row's recordStartAt in a temporary 
column.
+    val rowsToDecomposeWithWindowCols = rowsToDecomposePerKey.withColumn(
+      Scd2BatchProcessor.nextRecordStartAtColName,
+      F.lead(recordStartAtField, 1).over(orderChronologicallyPerKeyWindow)
+    )
+
+    val isClosedUpsertRow = RowClassifier.isClosedUpsert(
+      recordStartAt = recordStartAtField,
+      startAt = startAtCol,
+      endAt = endAtCol
+    )
+    val nextRowBisectsCurrentRow =
+      nextRecordStartAt.isNotNull && nextRecordStartAt < endAtCol
+    val rowShouldDecompose = isClosedUpsertRow && nextRowBisectsCurrentRow
+
+    val originalCols: Seq[String] = rowsToDecomposePerKey.columns.toSeq
+    val originalSchema: StructType = rowsToDecomposePerKey.schema
+    def withOriginalSchemaPreserved(fieldName: String, expr: Column): Column = 
{
+      val f = originalSchema(fieldName)
+      expr.cast(f.dataType).as(f.name, f.metadata)
+    }
+
+    // Constructs the head of a row post-decomposition.
+    def constructDecomposedRowHead: Column = {
+      val fields = originalCols.map {
+        case colName if colName == Scd2BatchProcessor.endAtColName =>
+          // End-at is opened (set to null); every other column is inherited 
as-is from the
+          // original parent row.
+          withOriginalSchemaPreserved(colName, F.lit(null))
+        case colName =>
+          withOriginalSchemaPreserved(colName, F.col(colName))
+      }
+      F.struct(fields: _*)
+    }
+
+    // Constructs the tail of a row post-decomposition.
+    def constructDecomposedRowTail: Column = {
+      val fields = originalCols.map {
+        case colName if colName == Scd2BatchProcessor.startAtColName =>
+          // Start-at is opened (set to null), every other column is inherited 
as-is from the
+          // original parent row.
+          withOriginalSchemaPreserved(colName, F.lit(null))
+        case colName if colName == AutoCdcReservedNames.cdcMetadataColName =>
+          withOriginalSchemaPreserved(
+            colName,
+            Scd2BatchProcessor.constructCdcMetadataCol(
+              recordStartAt = F.lit(null).cast(resolvedSequencingType),
+              sequencingType = resolvedSequencingType
+            )
+          )
+        case colName =>
+          withOriginalSchemaPreserved(colName, F.col(colName))
+      }
+      F.struct(fields: _*)
+    }
+
+    // No-op decomposition carries over the row exactly as-is.
+    def constructNoopDecomposedRow: Column = {
+      val fields = originalCols.map(colName =>
+        withOriginalSchemaPreserved(colName, F.col(colName))
+      )
+      F.struct(fields: _*)
+    }
+
+    // If a row is bisected by its window-order successor, decompose it into a 
head + tail
+    // pair. Otherwise pass through as a single-element array so the explode 
below is uniform.
+    val perRowDecompositionResults = F
+      .when(
+        rowShouldDecompose,
+        F.array(constructDecomposedRowHead, constructDecomposedRowTail)
+      )
+      .otherwise(F.array(constructNoopDecomposedRow))
+
+    rowsToDecomposeWithWindowCols
+      // The output schema matches the input schema exactly; no extra columns 
are projected.
+      .withColumn(
+        Scd2BatchProcessor.decompositionExplodedColName,
+        F.explode(perRowDecompositionResults)
+      )
+      .select(F.col(s"${Scd2BatchProcessor.decompositionExplodedColName}.*"))
+  }
+
+  /**
+   * Asserts that every row in `decomposedRowsPerKey` conforms to one of the 
four canonical
+   * post-decomposition shapes - tombstone, open upsert, closed upsert, or 
decomposition
+   * tail - and is otherwise a structural identity transform.
+   *
+   * @param decomposedRowsPerKey
+   *   the output of [[decomposeOutOfOrderRows]]: a dataframe conforming to 
the canonical
+   *   SCD2 row schema `[user_cols..., [[startAtColName]], [[endAtColName]],
+   *   [[cdcMetadataColName]]]`.
+   * @return
+   *   a dataframe with the exact same schema and rows as the input. Failing 
the
+   *   well-formedness check is treated as an internal-invariant violation: at 
execution
+   *   time, the first ill-formed row encountered aborts the query with a 
SparkRuntimeException
+   */
+  private[autocdc] def assertWellFormedRowsPostDecomposition(
+      decomposedRowsPerKey: DataFrame,
+      batchId: Long
+  ): DataFrame = {
+    val recordStartAtField =
+      
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+    val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+    val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+
+    val isWellFormedRow =
+      RowClassifier.isDecompositionTail(recordStartAtField, startAtCol, 
endAtCol) ||
+        RowClassifier.isTombstone(recordStartAtField, startAtCol, endAtCol) ||
+        RowClassifier.isUpsertRepresentingRow(recordStartAtField, startAtCol, 
endAtCol)
+
+    def stringOrNullLit(c: Column): Column = F.coalesce(c.cast(StringType), 
F.lit("null"))
+    val malformedRowDiagnostic = F.concat(
+      F.lit(
+        s"During SCD2 reconciliation of microbatch [id=${batchId}], 
encountered a " +
+        "post-decomposition row of unexpected shape:"
+      ),
+      F.lit(s" ${Scd2BatchProcessor.recordStartAtFieldName}="),
+      stringOrNullLit(recordStartAtField),
+      F.lit(s", ${Scd2BatchProcessor.startAtColName}="),
+      stringOrNullLit(startAtCol),
+      F.lit(s", ${Scd2BatchProcessor.endAtColName}="),
+      stringOrNullLit(endAtCol),
+      F.lit(".")
+    )
+
+    val internalErrorOnMalformed = ExpressionUtils.column(
+      If(
+        predicate = ExpressionUtils.expression(isWellFormedRow),
+        trueValue = Literal(null, BooleanType),
+        falseValue = RaiseError(
+          Literal("INTERNAL_ERROR"),
+          CreateMap(Seq(
+            Literal("message"),
+            ExpressionUtils.expression(malformedRowDiagnostic)
+          )),
+          BooleanType
+        )
+      )
+    )
+    decomposedRowsPerKey.filter(internalErrorOnMalformed.isNull)
+  }

Review Comment:
   I agree on balance that this makes sense. This is a compromise being forced 
by the untyped nature of dataframes, in a more strongly typed system we would 
1000% do what's necessary to make this check happen at compile time.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -49,6 +51,60 @@ case class Scd2BatchProcessor(
    */
   private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name)
 
+  /**
+   * WindowSpec that sorts CDC event rows in ascending order per key, by event 
origination
+   * sequence time (i.e, record start at).
+   */
+  private[autocdc] val orderChronologicallyPerKeyWindow: WindowSpec = {
+    val recordStartAtCol = Scd2BatchProcessor.recordStartAtOf(
+      F.col(AutoCdcReservedNames.cdcMetadataColName)
+    )
+    val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+    val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+    val row = Scd2IntervalColumns(recordStartAtCol, startAtCol, endAtCol)
+
+    // All rows except decomposition tails have a non-null recordStartAt. 
Tails use their

Review Comment:
   I think this comment got lost in a refactoring somewhere. It makes it sound 
like there's a bug here because there's no special case for decomposition 
tails, but actually it's all handled inside effectiveRecordStartAt computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to