jose-torres commented on code in PR #56311:
URL: https://github.com/apache/spark/pull/56311#discussion_r3462843843
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -547,3 +879,82 @@ object Scd2BatchProcessor {
F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
}
}
+
+/**
+ * The three columns that locate a row on the SCD2 timeline: its source
record-start sequence
+ * (`recordStartAt`, null only for decomposition tails) and the bounds of its
visible interval
+ * [`startAt`, `endAt`). [[RowClassifier]] classifies a row purely from this
triple.
+ */
+private[autocdc] case class Scd2IntervalColumns(
+ recordStartAt: Column,
+ startAt: Column,
+ endAt: Column) {
+
+ /**
+ * The row's effective ordering position. Decomposition tails carry no
`recordStartAt` and
+ * fall back to their closing sequence (`endAt`), the same convention used by
+ * [[Scd2BatchProcessor.orderChronologicallyPerKeyWindow]].
+ */
+ def effectiveRecordStartAt: Column = F.coalesce(recordStartAt, endAt)
+}
+
+object RowClassifier {
+
+ /**
+ * Synthetic right boundary created by splitting a closed row, temporarily
present during
+ * microbatch reconciliation but never materializes in the target or aux
tables.
+ */
+ private[autocdc] def isDecompositionTail(row: Scd2IntervalColumns): Column =
Review Comment:
Please try converting this into an unapply() case match. I'm not 100% sure
that it will work, but my instinct is that it will be much more maintainable
that way; right now there's a quite complex implicit invariant that the methods
of this classifier cover all the cases and don't overlap.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -547,3 +879,82 @@ object Scd2BatchProcessor {
F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
}
}
+
+/**
+ * The three columns that locate a row on the SCD2 timeline: its source
record-start sequence
+ * (`recordStartAt`, null only for decomposition tails) and the bounds of its
visible interval
+ * [`startAt`, `endAt`). [[RowClassifier]] classifies a row purely from this
triple.
+ */
+private[autocdc] case class Scd2IntervalColumns(
+ recordStartAt: Column,
+ startAt: Column,
+ endAt: Column) {
+
+ /**
+ * The row's effective ordering position. Decomposition tails carry no
`recordStartAt` and
+ * fall back to their closing sequence (`endAt`), the same convention used by
+ * [[Scd2BatchProcessor.orderChronologicallyPerKeyWindow]].
+ */
+ def effectiveRecordStartAt: Column = F.coalesce(recordStartAt, endAt)
+}
+
+object RowClassifier {
+
+ /**
+ * Synthetic right boundary created by splitting a closed row, temporarily
present during
+ * microbatch reconciliation but never materializes in the target or aux
tables.
+ */
+ private[autocdc] def isDecompositionTail(row: Scd2IntervalColumns): Column =
+ row.recordStartAt.isNull && row.startAt.isNull && row.endAt.isNotNull
+
+ /**
+ * Upsert row that is currently open in the visible timeline. Hidden no-op
upserts are
+ * also open until reconciliation decides whether they should stay hidden.
+ */
+ private[autocdc] def isOpenUpsert(row: Scd2IntervalColumns): Column =
+ row.recordStartAt.isNotNull &&
+ row.startAt.isNotNull &&
+ row.endAt.isNull &&
+ // startAt < recordStartAt implies this row belongs to but is not the
head of some
+ // upsert-run, else this is the head of a run.
+ row.startAt <= row.recordStartAt
+
+ /**
+ * Upsert row whose visible interval has already been closed by a strictly
later event;
+ * the historical counterpart to [[isOpenUpsert]].
+ *
+ * Notably, a zero-width [startAt, endAt) interval is not considered a valid
closed upsert.
+ */
+ private[autocdc] def isClosedUpsert(row: Scd2IntervalColumns): Column =
+ row.recordStartAt.isNotNull &&
+ row.startAt.isNotNull &&
+ row.endAt.isNotNull &&
+ row.recordStartAt < row.endAt &&
+ row.startAt < row.endAt &&
+ // startAt <= recordStartAt covers both the run-head case (startAt ==
recordStartAt)
+ // and the no-op-continuation case (startAt < recordStartAt).
+ row.startAt <= row.recordStartAt
+
+ /**
+ * Any row that semantically encodes an upsert event.
+ */
+ private[autocdc] def isUpsertRepresentingRow(row: Scd2IntervalColumns):
Column =
+ isOpenUpsert(row) || isClosedUpsert(row)
+
+ /**
+ * Tombstone (delete-boundary) row, encoded as an instantaneous interval at
+ * `recordStartAt`. Never materializes in the target table, only in the aux
table.
+ *
+ * User-data column values on tombstones are not part of the SCD2 contract:
they may
+ * reflect the originating delete event, the values of the upsert whose
closed-interval
+ * row was bisected (when the tombstone was promoted from a decomposition
tail), or be
+ * null altogether. Reconciliation does not consume these values for any
semantic
+ * decision.
+ */
+ private[autocdc] def isTombstone(row: Scd2IntervalColumns): Column =
Review Comment:
I worry about semantic issues with calling this a "tombstone", since the
natural meaning of "tombstone" (this row used to exist but now was deleted) is
in fact represented by a closed upsert that just doesn't happen to have a
followup. Perhaps "instantaneous delete"?
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -340,6 +404,248 @@ case class Scd2BatchProcessor(
affectedRowsFromTargetTable
}
+
+ /**
+ * For every closed non-tombstone row in the input dataframe whose immediate
window-order
+ * successor (in the same per-key partition, per
[[orderChronologicallyPerKeyWindow]])
+ * has `recordStartAt` strictly less than its `endAt` (in other words the
row is being
+ * bisected by its neighbor), replace that row by a "head" + "tail" pair:
+ * - head: copies the parent row exactly, except [[endAtColName]] is set
to null.
+ * - tail: copies the parent row exactly, except [[startAtColName]] is set
to null and
+ * [[recordStartAtFieldName]] inside [[cdcMetadataColName]] is set to
null. All user
+ * data columns are inherited from the parent as-is.
+ *
+ * Decomposition tails are uniquely identified by [[recordStartAtFieldName]]
= null and
+ * are temporary: downstream reconciliation drops them when a coincident
non-tail row
+ * already represents the same closure, or promotes them to tombstones in
the aux table
+ * otherwise.
+ *
+ * All other input rows pass through unchanged ("no-op decompose"). This
includes:
+ * 1. Open rows ([[endAtColName]] = null): incoming upserts, no-op
continuations, etc.
+ * 2. Tombstones ([[startAtColName]] = [[endAtColName]]): protected from
decomposition
+ * even though they qualify as "closed" in the broader sense, because
their interval
+ * is degenerate.
+ * 3. Closed non-tombstone rows whose successor's
[[recordStartAtFieldName]] is `>=`
+ * this row's [[endAtColName]]: the closing event already coincides
with or follows
+ * the run boundary, so there is nothing to bisect.
+ *
+ * Bisection detection is implemented via `LEAD(1)` over
[[orderChronologicallyPerKeyWindow]]:
+ * because the window orders by effective recordStartAt ascending, examining
only the
+ * immediate successor is sufficient to decide whether any other row in the
partition has
+ * a recordStartAt within `[recordStartAt, endAt)`.
+ *
+ * Decomposing closed rows that are being bisected gives us that chance to
form new
+ * closed intervals using the incoming microbatch events, later in
reconciliation.
+ *
+ * @param rowsToDecomposePerKey
+ * a dataframe conforming to the canonical SCD2 row schema
+ * `[user_cols..., [[startAtColName]], [[endAtColName]],
[[cdcMetadataColName]]]`, where
+ * [[cdcMetadataColName]] conforms to [[cdcMetadataColSchema]].
Decomposition tails
+ * (rows with [[recordStartAtFieldName]] = null) MUST NOT be present on
input - they are
+ * produced exclusively by this function.
+ * @return
+ * a dataframe with the same schema as the input. Every closed
non-tombstone row that
+ * was bisected has been replaced by its head + tail pair; every other row
is carried
+ * through as-is. Each output row can be classified as one of:
{decomposition head,
+ * decomposition tail, tombstone, open upsert, closed-and-unbisected row}.
It's possible
+ * that some of the returned decomposition tails are logically redundant,
as deletion
+ * markers that are immediately overtaken by a succeeding row.
+ */
+ private[autocdc] def decomposeOutOfOrderRows(rowsToDecomposePerKey:
DataFrame): DataFrame = {
+ val recordStartAtField =
+
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+ val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+ val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+ val nextRecordStartAt = F.col(Scd2BatchProcessor.nextRecordStartAtColName)
+
+ // Track the next (in sorted order) row's recordStartAt in a temporary
column.
+ val rowsToDecomposeWithWindowCols = rowsToDecomposePerKey.withColumn(
+ Scd2BatchProcessor.nextRecordStartAtColName,
+ F.lead(recordStartAtField, 1).over(orderChronologicallyPerKeyWindow)
+ )
+
+ val isClosedUpsertRow = RowClassifier.isClosedUpsert(
+ recordStartAt = recordStartAtField,
+ startAt = startAtCol,
+ endAt = endAtCol
+ )
+ val nextRowBisectsCurrentRow =
+ nextRecordStartAt.isNotNull && nextRecordStartAt < endAtCol
+ val rowShouldDecompose = isClosedUpsertRow && nextRowBisectsCurrentRow
+
+ val originalCols: Seq[String] = rowsToDecomposePerKey.columns.toSeq
+ val originalSchema: StructType = rowsToDecomposePerKey.schema
+ def withOriginalSchemaPreserved(fieldName: String, expr: Column): Column =
{
+ val f = originalSchema(fieldName)
+ expr.cast(f.dataType).as(f.name, f.metadata)
+ }
+
+ // Constructs the head of a row post-decomposition.
+ def constructDecomposedRowHead: Column = {
+ val fields = originalCols.map {
+ case colName if colName == Scd2BatchProcessor.endAtColName =>
+ // End-at is opened (set to null); every other column is inherited
as-is from the
+ // original parent row.
+ withOriginalSchemaPreserved(colName, F.lit(null))
+ case colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ }
+ F.struct(fields: _*)
+ }
+
+ // Constructs the tail of a row post-decomposition.
+ def constructDecomposedRowTail: Column = {
+ val fields = originalCols.map {
+ case colName if colName == Scd2BatchProcessor.startAtColName =>
+ // Start-at is opened (set to null), every other column is inherited
as-is from the
+ // original parent row.
+ withOriginalSchemaPreserved(colName, F.lit(null))
+ case colName if colName == AutoCdcReservedNames.cdcMetadataColName =>
+ withOriginalSchemaPreserved(
+ colName,
+ Scd2BatchProcessor.constructCdcMetadataCol(
+ recordStartAt = F.lit(null).cast(resolvedSequencingType),
+ sequencingType = resolvedSequencingType
+ )
+ )
+ case colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ }
+ F.struct(fields: _*)
+ }
+
+ // No-op decomposition carries over the row exactly as-is.
+ def constructNoopDecomposedRow: Column = {
+ val fields = originalCols.map(colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ )
+ F.struct(fields: _*)
+ }
+
+ // If a row is bisected by its window-order successor, decompose it into a
head + tail
+ // pair. Otherwise pass through as a single-element array so the explode
below is uniform.
+ val perRowDecompositionResults = F
+ .when(
+ rowShouldDecompose,
+ F.array(constructDecomposedRowHead, constructDecomposedRowTail)
+ )
+ .otherwise(F.array(constructNoopDecomposedRow))
+
+ rowsToDecomposeWithWindowCols
+ // The output schema matches the input schema exactly; no extra columns
are projected.
+ .withColumn(
+ Scd2BatchProcessor.decompositionExplodedColName,
+ F.explode(perRowDecompositionResults)
+ )
+ .select(F.col(s"${Scd2BatchProcessor.decompositionExplodedColName}.*"))
+ }
+
+ /**
+ * Asserts that every row in `decomposedRowsPerKey` conforms to one of the
four canonical
+ * post-decomposition shapes - tombstone, open upsert, closed upsert, or
decomposition
+ * tail - and is otherwise a structural identity transform.
+ *
+ * @param decomposedRowsPerKey
+ * the output of [[decomposeOutOfOrderRows]]: a dataframe conforming to
the canonical
+ * SCD2 row schema `[user_cols..., [[startAtColName]], [[endAtColName]],
+ * [[cdcMetadataColName]]]`.
+ * @return
+ * a dataframe with the exact same schema and rows as the input. Failing
the
+ * well-formedness check is treated as an internal-invariant violation: at
execution
+ * time, the first ill-formed row encountered aborts the query with a
SparkRuntimeException
+ */
+ private[autocdc] def assertWellFormedRowsPostDecomposition(
+ decomposedRowsPerKey: DataFrame,
+ batchId: Long
+ ): DataFrame = {
+ val recordStartAtField =
+
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+ val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+ val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+
+ val isWellFormedRow =
+ RowClassifier.isDecompositionTail(recordStartAtField, startAtCol,
endAtCol) ||
+ RowClassifier.isTombstone(recordStartAtField, startAtCol, endAtCol) ||
+ RowClassifier.isUpsertRepresentingRow(recordStartAtField, startAtCol,
endAtCol)
+
+ def stringOrNullLit(c: Column): Column = F.coalesce(c.cast(StringType),
F.lit("null"))
+ val malformedRowDiagnostic = F.concat(
+ F.lit(
+ s"During SCD2 reconciliation of microbatch [id=${batchId}],
encountered a " +
+ "post-decomposition row of unexpected shape:"
+ ),
+ F.lit(s" ${Scd2BatchProcessor.recordStartAtFieldName}="),
+ stringOrNullLit(recordStartAtField),
+ F.lit(s", ${Scd2BatchProcessor.startAtColName}="),
+ stringOrNullLit(startAtCol),
+ F.lit(s", ${Scd2BatchProcessor.endAtColName}="),
+ stringOrNullLit(endAtCol),
+ F.lit(".")
+ )
+
+ val internalErrorOnMalformed = ExpressionUtils.column(
+ If(
+ predicate = ExpressionUtils.expression(isWellFormedRow),
+ trueValue = Literal(null, BooleanType),
+ falseValue = RaiseError(
+ Literal("INTERNAL_ERROR"),
+ CreateMap(Seq(
+ Literal("message"),
+ ExpressionUtils.expression(malformedRowDiagnostic)
+ )),
+ BooleanType
+ )
+ )
+ )
+ decomposedRowsPerKey.filter(internalErrorOnMalformed.isNull)
+ }
Review Comment:
I agree on balance that this makes sense. This is a compromise being forced
by the untyped nature of dataframes, in a more strongly typed system we would
1000% do what's necessary to make this check happen at compile time.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -49,6 +51,60 @@ case class Scd2BatchProcessor(
*/
private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name)
+ /**
+ * WindowSpec that sorts CDC event rows in ascending order per key, by event
origination
+ * sequence time (i.e, record start at).
+ */
+ private[autocdc] val orderChronologicallyPerKeyWindow: WindowSpec = {
+ val recordStartAtCol = Scd2BatchProcessor.recordStartAtOf(
+ F.col(AutoCdcReservedNames.cdcMetadataColName)
+ )
+ val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+ val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+ val row = Scd2IntervalColumns(recordStartAtCol, startAtCol, endAtCol)
+
+ // All rows except decomposition tails have a non-null recordStartAt.
Tails use their
Review Comment:
I think this comment got lost in a refactoring somewhere. It makes it sound
like there's a bug here because there's no special case for decomposition
tails, but actually it's all handled inside effectiveRecordStartAt computation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]