AnishMahto commented on code in PR #56311:
URL: https://github.com/apache/spark/pull/56311#discussion_r3390118893
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -340,6 +404,248 @@ case class Scd2BatchProcessor(
affectedRowsFromTargetTable
}
+
+ /**
+ * For every closed non-tombstone row in the input dataframe whose immediate
window-order
+ * successor (in the same per-key partition, per
[[orderChronologicallyPerKeyWindow]])
+ * has `recordStartAt` strictly less than its `endAt` (in other words the
row is being
+ * bisected by its neighbor), replace that row by a "head" + "tail" pair:
+ * - head: copies the parent row exactly, except [[endAtColName]] is set
to null.
+ * - tail: copies the parent row exactly, except [[startAtColName]] is set
to null and
+ * [[recordStartAtFieldName]] inside [[cdcMetadataColName]] is set to
null. All user
+ * data columns are inherited from the parent as-is.
+ *
+ * Decomposition tails are uniquely identified by [[recordStartAtFieldName]]
= null and
+ * are temporary: downstream reconciliation drops them when a coincident
non-tail row
+ * already represents the same closure, or promotes them to tombstones in
the aux table
+ * otherwise.
+ *
+ * All other input rows pass through unchanged ("no-op decompose"). This
includes:
+ * 1. Open rows ([[endAtColName]] = null): incoming upserts, no-op
continuations, etc.
+ * 2. Tombstones ([[startAtColName]] = [[endAtColName]]): protected from
decomposition
+ * even though they qualify as "closed" in the broader sense, because
their interval
+ * is degenerate.
+ * 3. Closed non-tombstone rows whose successor's
[[recordStartAtFieldName]] is `>=`
+ * this row's [[endAtColName]]: the closing event already coincides
with or follows
+ * the run boundary, so there is nothing to bisect.
+ *
+ * Bisection detection is implemented via `LEAD(1)` over
[[orderChronologicallyPerKeyWindow]]:
+ * because the window orders by effective recordStartAt ascending, examining
only the
+ * immediate successor is sufficient to decide whether any other row in the
partition has
+ * a recordStartAt within `[recordStartAt, endAt)`.
+ *
+ * Decomposing closed rows that are being bisected gives us that chance to
form new
+ * closed intervals using the incoming microbatch events, later in
reconciliation.
+ *
+ * @param rowsToDecomposePerKey
+ * a dataframe conforming to the canonical SCD2 row schema
+ * `[user_cols..., [[startAtColName]], [[endAtColName]],
[[cdcMetadataColName]]]`, where
+ * [[cdcMetadataColName]] conforms to [[cdcMetadataColSchema]].
Decomposition tails
+ * (rows with [[recordStartAtFieldName]] = null) MUST NOT be present on
input - they are
+ * produced exclusively by this function.
+ * @return
+ * a dataframe with the same schema as the input. Every closed
non-tombstone row that
+ * was bisected has been replaced by its head + tail pair; every other row
is carried
+ * through as-is. Each output row can be classified as one of:
{decomposition head,
+ * decomposition tail, tombstone, open upsert, closed-and-unbisected row}.
It's possible
+ * that some of the returned decomposition tails are logically redundant,
as deletion
+ * markers that are immediately overtaken by a succeeding row.
+ */
+ private[autocdc] def decomposeOutOfOrderRows(rowsToDecomposePerKey:
DataFrame): DataFrame = {
+ val recordStartAtField =
+
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+ val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+ val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+ val nextRecordStartAt = F.col(Scd2BatchProcessor.nextRecordStartAtColName)
+
+ // Track the next (in sorted order) row's recordStartAt in a temporary
column.
+ val rowsToDecomposeWithWindowCols = rowsToDecomposePerKey.withColumn(
+ Scd2BatchProcessor.nextRecordStartAtColName,
+ F.lead(recordStartAtField, 1).over(orderChronologicallyPerKeyWindow)
+ )
+
+ val isClosedUpsertRow = RowClassifier.isClosedUpsert(
+ recordStartAt = recordStartAtField,
+ startAt = startAtCol,
+ endAt = endAtCol
+ )
+ val nextRowBisectsCurrentRow =
+ nextRecordStartAt.isNotNull && nextRecordStartAt < endAtCol
+ val rowShouldDecompose = isClosedUpsertRow && nextRowBisectsCurrentRow
+
+ val originalCols: Seq[String] = rowsToDecomposePerKey.columns.toSeq
+ val originalSchema: StructType = rowsToDecomposePerKey.schema
+ def withOriginalSchemaPreserved(fieldName: String, expr: Column): Column =
{
+ val f = originalSchema(fieldName)
+ expr.cast(f.dataType).as(f.name, f.metadata)
+ }
+
+ // Constructs the head of a row post-decomposition.
+ def constructDecomposedRowHead: Column = {
+ val fields = originalCols.map {
+ case colName if colName == Scd2BatchProcessor.endAtColName =>
+ // End-at is opened (set to null); every other column is inherited
as-is from the
+ // original parent row.
+ withOriginalSchemaPreserved(colName, F.lit(null))
+ case colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ }
+ F.struct(fields: _*)
+ }
+
+ // Constructs the tail of a row post-decomposition.
+ def constructDecomposedRowTail: Column = {
+ val fields = originalCols.map {
+ case colName if colName == Scd2BatchProcessor.startAtColName =>
+ // Start-at is opened (set to null), every other column is inherited
as-is from the
+ // original parent row.
+ withOriginalSchemaPreserved(colName, F.lit(null))
+ case colName if colName == AutoCdcReservedNames.cdcMetadataColName =>
+ withOriginalSchemaPreserved(
+ colName,
+ Scd2BatchProcessor.constructCdcMetadataCol(
+ recordStartAt = F.lit(null).cast(resolvedSequencingType),
+ sequencingType = resolvedSequencingType
+ )
+ )
+ case colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ }
+ F.struct(fields: _*)
+ }
+
+ // No-op decomposition carries over the row exactly as-is.
+ def constructNoopDecomposedRow: Column = {
+ val fields = originalCols.map(colName =>
+ withOriginalSchemaPreserved(colName, F.col(colName))
+ )
+ F.struct(fields: _*)
+ }
+
+ // If a row is bisected by its window-order successor, decompose it into a
head + tail
+ // pair. Otherwise pass through as a single-element array so the explode
below is uniform.
+ val perRowDecompositionResults = F
+ .when(
+ rowShouldDecompose,
+ F.array(constructDecomposedRowHead, constructDecomposedRowTail)
+ )
+ .otherwise(F.array(constructNoopDecomposedRow))
+
+ rowsToDecomposeWithWindowCols
+ // The output schema matches the input schema exactly; no extra columns
are projected.
+ .withColumn(
+ Scd2BatchProcessor.decompositionExplodedColName,
+ F.explode(perRowDecompositionResults)
+ )
+ .select(F.col(s"${Scd2BatchProcessor.decompositionExplodedColName}.*"))
+ }
+
+ /**
+ * Asserts that every row in `decomposedRowsPerKey` conforms to one of the
four canonical
+ * post-decomposition shapes - tombstone, open upsert, closed upsert, or
decomposition
+ * tail - and is otherwise a structural identity transform.
+ *
+ * @param decomposedRowsPerKey
+ * the output of [[decomposeOutOfOrderRows]]: a dataframe conforming to
the canonical
+ * SCD2 row schema `[user_cols..., [[startAtColName]], [[endAtColName]],
+ * [[cdcMetadataColName]]]`.
+ * @return
+ * a dataframe with the exact same schema and rows as the input. Failing
the
+ * well-formedness check is treated as an internal-invariant violation: at
execution
+ * time, the first ill-formed row encountered aborts the query with a
SparkRuntimeException
+ */
+ private[autocdc] def assertWellFormedRowsPostDecomposition(
+ decomposedRowsPerKey: DataFrame,
+ batchId: Long
+ ): DataFrame = {
+ val recordStartAtField =
+
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+ val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+ val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+
+ val isWellFormedRow =
+ RowClassifier.isDecompositionTail(recordStartAtField, startAtCol,
endAtCol) ||
+ RowClassifier.isTombstone(recordStartAtField, startAtCol, endAtCol) ||
+ RowClassifier.isUpsertRepresentingRow(recordStartAtField, startAtCol,
endAtCol)
+
+ def stringOrNullLit(c: Column): Column = F.coalesce(c.cast(StringType),
F.lit("null"))
+ val malformedRowDiagnostic = F.concat(
+ F.lit(
+ s"During SCD2 reconciliation of microbatch [id=${batchId}],
encountered a " +
+ "post-decomposition row of unexpected shape:"
+ ),
+ F.lit(s" ${Scd2BatchProcessor.recordStartAtFieldName}="),
+ stringOrNullLit(recordStartAtField),
+ F.lit(s", ${Scd2BatchProcessor.startAtColName}="),
+ stringOrNullLit(startAtCol),
+ F.lit(s", ${Scd2BatchProcessor.endAtColName}="),
+ stringOrNullLit(endAtCol),
+ F.lit(".")
+ )
+
+ val internalErrorOnMalformed = ExpressionUtils.column(
+ If(
+ predicate = ExpressionUtils.expression(isWellFormedRow),
+ trueValue = Literal(null, BooleanType),
+ falseValue = RaiseError(
+ Literal("INTERNAL_ERROR"),
+ CreateMap(Seq(
+ Literal("message"),
+ ExpressionUtils.expression(malformedRowDiagnostic)
+ )),
+ BooleanType
+ )
+ )
+ )
+ decomposedRowsPerKey.filter(internalErrorOnMalformed.isNull)
+ }
Review Comment:
Continuing discussion from the previous PR, here's the first internal
validation logic that I'm considering adding to SCD2 microbatch reconciliation.
Internal validation here means we are asserting on properties that we expect
to be true if our implementation is correct, and all of the invariants we
expect to be true actually hold.
There's a real argument to be made for not having such validation. If the
validation throws here, it indicates either the implementation is wrong OR the
user has gone out of their way to mutate the aux/target in a way that is
invalid with the invariants required by the algorithm. But we cannot
distinguish between the two cases, so there isn't really any meaningful action
users can take if this throws, other than trying a full-refresh and hoping the
issue was transient.
There also is _some_ runtime penalty for executing this validation, although
I'd argue it's likely negligible. The filter operation here is lazy, so we
shouldn't be incurring an additional full dataframe scan.
From an implementation hygiene perspective, there's also an argument that
invariants should be asserted and contractualized in tests and not in
execution. While I buy in to that logic, the SCD2 algorithm is sophisticated
enough where we cannot reasonably expect test all meaningfully different input
sets that stress-test the assumed invariants. At best we can have the
random-data fuzz test, but that is still non-deterministic random sampling.
Given all of this, reasons why I would argue for keeping this
runtime-validation:
- Although the exception is non-actionable for users, it's an extra guard to
prevent us from accidentally corrupting user data if the implementation is
incorrect or invariants are violated. At best the algorithm would throw at a
later point anyway, at worst we corrupt the target/aux with incorrect data
silently
- It's not super expensive
- The validation logic is self-documenting in a meaningful way. We can write
as many comments as we want to try to document implicit invariants, but readers
will have no way of knowing whether the comments themselves are still accurate,
and readers will have to go through the full reasoning process by reading
through multiple documented invariants across this algorithm. In contrast,
post-validation, a reader can now genuinely trust the asserted properties,
since the implementation would have thrown otherwise before that point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]