AnishMahto commented on code in PR #56311:
URL: https://github.com/apache/spark/pull/56311#discussion_r3390118893


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -340,6 +404,248 @@ case class Scd2BatchProcessor(
 
     affectedRowsFromTargetTable
   }
+
+  /**
+   * For every closed non-tombstone row in the input dataframe whose immediate 
window-order
+   * successor (in the same per-key partition, per 
[[orderChronologicallyPerKeyWindow]])
+   * has `recordStartAt` strictly less than its `endAt` (in other words the 
row is being
+   * bisected by its neighbor), replace that row by a "head" + "tail" pair:
+   *   - head: copies the parent row exactly, except [[endAtColName]] is set 
to null.
+   *   - tail: copies the parent row exactly, except [[startAtColName]] is set 
to null and
+   *     [[recordStartAtFieldName]] inside [[cdcMetadataColName]] is set to 
null. All user
+   *     data columns are inherited from the parent as-is.
+   *
+   * Decomposition tails are uniquely identified by [[recordStartAtFieldName]] 
= null and
+   * are temporary: downstream reconciliation drops them when a coincident 
non-tail row
+   * already represents the same closure, or promotes them to tombstones in 
the aux table
+   * otherwise.
+   *
+   * All other input rows pass through unchanged ("no-op decompose"). This 
includes:
+   *   1. Open rows ([[endAtColName]] = null): incoming upserts, no-op 
continuations, etc.
+   *   2. Tombstones ([[startAtColName]] = [[endAtColName]]): protected from 
decomposition
+   *      even though they qualify as "closed" in the broader sense, because 
their interval
+   *      is degenerate.
+   *   3. Closed non-tombstone rows whose successor's 
[[recordStartAtFieldName]] is `>=`
+   *      this row's [[endAtColName]]: the closing event already coincides 
with or follows
+   *      the run boundary, so there is nothing to bisect.
+   *
+   * Bisection detection is implemented via `LEAD(1)` over 
[[orderChronologicallyPerKeyWindow]]:
+   * because the window orders by effective recordStartAt ascending, examining 
only the
+   * immediate successor is sufficient to decide whether any other row in the 
partition has
+   * a recordStartAt within `[recordStartAt, endAt)`.
+   *
+   * Decomposing closed rows that are being bisected gives us that chance to 
form new
+   * closed intervals using the incoming microbatch events, later in 
reconciliation.
+   *
+   * @param rowsToDecomposePerKey
+   *   a dataframe conforming to the canonical SCD2 row schema
+   *   `[user_cols..., [[startAtColName]], [[endAtColName]], 
[[cdcMetadataColName]]]`, where
+   *   [[cdcMetadataColName]] conforms to [[cdcMetadataColSchema]]. 
Decomposition tails
+   *   (rows with [[recordStartAtFieldName]] = null) MUST NOT be present on 
input - they are
+   *   produced exclusively by this function.
+   * @return
+   *   a dataframe with the same schema as the input. Every closed 
non-tombstone row that
+   *   was bisected has been replaced by its head + tail pair; every other row 
is carried
+   *   through as-is. Each output row can be classified as one of: 
{decomposition head,
+   *   decomposition tail, tombstone, open upsert, closed-and-unbisected row}. 
It's possible
+   *   that some of the returned decomposition tails are logically redundant, 
as deletion
+   *   markers that are immediately overtaken by a succeeding row.
+   */
+  private[autocdc] def decomposeOutOfOrderRows(rowsToDecomposePerKey: 
DataFrame): DataFrame = {
+    val recordStartAtField =
+      
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+    val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+    val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+    val nextRecordStartAt = F.col(Scd2BatchProcessor.nextRecordStartAtColName)
+
+    // Track the next (in sorted order) row's recordStartAt in a temporary 
column.
+    val rowsToDecomposeWithWindowCols = rowsToDecomposePerKey.withColumn(
+      Scd2BatchProcessor.nextRecordStartAtColName,
+      F.lead(recordStartAtField, 1).over(orderChronologicallyPerKeyWindow)
+    )
+
+    val isClosedUpsertRow = RowClassifier.isClosedUpsert(
+      recordStartAt = recordStartAtField,
+      startAt = startAtCol,
+      endAt = endAtCol
+    )
+    val nextRowBisectsCurrentRow =
+      nextRecordStartAt.isNotNull && nextRecordStartAt < endAtCol
+    val rowShouldDecompose = isClosedUpsertRow && nextRowBisectsCurrentRow
+
+    val originalCols: Seq[String] = rowsToDecomposePerKey.columns.toSeq
+    val originalSchema: StructType = rowsToDecomposePerKey.schema
+    def withOriginalSchemaPreserved(fieldName: String, expr: Column): Column = 
{
+      val f = originalSchema(fieldName)
+      expr.cast(f.dataType).as(f.name, f.metadata)
+    }
+
+    // Constructs the head of a row post-decomposition.
+    def constructDecomposedRowHead: Column = {
+      val fields = originalCols.map {
+        case colName if colName == Scd2BatchProcessor.endAtColName =>
+          // End-at is opened (set to null); every other column is inherited 
as-is from the
+          // original parent row.
+          withOriginalSchemaPreserved(colName, F.lit(null))
+        case colName =>
+          withOriginalSchemaPreserved(colName, F.col(colName))
+      }
+      F.struct(fields: _*)
+    }
+
+    // Constructs the tail of a row post-decomposition.
+    def constructDecomposedRowTail: Column = {
+      val fields = originalCols.map {
+        case colName if colName == Scd2BatchProcessor.startAtColName =>
+          // Start-at is opened (set to null), every other column is inherited 
as-is from the
+          // original parent row.
+          withOriginalSchemaPreserved(colName, F.lit(null))
+        case colName if colName == AutoCdcReservedNames.cdcMetadataColName =>
+          withOriginalSchemaPreserved(
+            colName,
+            Scd2BatchProcessor.constructCdcMetadataCol(
+              recordStartAt = F.lit(null).cast(resolvedSequencingType),
+              sequencingType = resolvedSequencingType
+            )
+          )
+        case colName =>
+          withOriginalSchemaPreserved(colName, F.col(colName))
+      }
+      F.struct(fields: _*)
+    }
+
+    // No-op decomposition carries over the row exactly as-is.
+    def constructNoopDecomposedRow: Column = {
+      val fields = originalCols.map(colName =>
+        withOriginalSchemaPreserved(colName, F.col(colName))
+      )
+      F.struct(fields: _*)
+    }
+
+    // If a row is bisected by its window-order successor, decompose it into a 
head + tail
+    // pair. Otherwise pass through as a single-element array so the explode 
below is uniform.
+    val perRowDecompositionResults = F
+      .when(
+        rowShouldDecompose,
+        F.array(constructDecomposedRowHead, constructDecomposedRowTail)
+      )
+      .otherwise(F.array(constructNoopDecomposedRow))
+
+    rowsToDecomposeWithWindowCols
+      // The output schema matches the input schema exactly; no extra columns 
are projected.
+      .withColumn(
+        Scd2BatchProcessor.decompositionExplodedColName,
+        F.explode(perRowDecompositionResults)
+      )
+      .select(F.col(s"${Scd2BatchProcessor.decompositionExplodedColName}.*"))
+  }
+
+  /**
+   * Asserts that every row in `decomposedRowsPerKey` conforms to one of the 
four canonical
+   * post-decomposition shapes - tombstone, open upsert, closed upsert, or 
decomposition
+   * tail - and is otherwise a structural identity transform.
+   *
+   * @param decomposedRowsPerKey
+   *   the output of [[decomposeOutOfOrderRows]]: a dataframe conforming to 
the canonical
+   *   SCD2 row schema `[user_cols..., [[startAtColName]], [[endAtColName]],
+   *   [[cdcMetadataColName]]]`.
+   * @return
+   *   a dataframe with the exact same schema and rows as the input. Failing 
the
+   *   well-formedness check is treated as an internal-invariant violation: at 
execution
+   *   time, the first ill-formed row encountered aborts the query with a 
SparkRuntimeException
+   */
+  private[autocdc] def assertWellFormedRowsPostDecomposition(
+      decomposedRowsPerKey: DataFrame,
+      batchId: Long
+  ): DataFrame = {
+    val recordStartAtField =
+      
Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName))
+    val startAtCol = F.col(Scd2BatchProcessor.startAtColName)
+    val endAtCol = F.col(Scd2BatchProcessor.endAtColName)
+
+    val isWellFormedRow =
+      RowClassifier.isDecompositionTail(recordStartAtField, startAtCol, 
endAtCol) ||
+        RowClassifier.isTombstone(recordStartAtField, startAtCol, endAtCol) ||
+        RowClassifier.isUpsertRepresentingRow(recordStartAtField, startAtCol, 
endAtCol)
+
+    def stringOrNullLit(c: Column): Column = F.coalesce(c.cast(StringType), 
F.lit("null"))
+    val malformedRowDiagnostic = F.concat(
+      F.lit(
+        s"During SCD2 reconciliation of microbatch [id=${batchId}], 
encountered a " +
+        "post-decomposition row of unexpected shape:"
+      ),
+      F.lit(s" ${Scd2BatchProcessor.recordStartAtFieldName}="),
+      stringOrNullLit(recordStartAtField),
+      F.lit(s", ${Scd2BatchProcessor.startAtColName}="),
+      stringOrNullLit(startAtCol),
+      F.lit(s", ${Scd2BatchProcessor.endAtColName}="),
+      stringOrNullLit(endAtCol),
+      F.lit(".")
+    )
+
+    val internalErrorOnMalformed = ExpressionUtils.column(
+      If(
+        predicate = ExpressionUtils.expression(isWellFormedRow),
+        trueValue = Literal(null, BooleanType),
+        falseValue = RaiseError(
+          Literal("INTERNAL_ERROR"),
+          CreateMap(Seq(
+            Literal("message"),
+            ExpressionUtils.expression(malformedRowDiagnostic)
+          )),
+          BooleanType
+        )
+      )
+    )
+    decomposedRowsPerKey.filter(internalErrorOnMalformed.isNull)
+  }

Review Comment:
   Continuing discussion from the previous PR, here's the first internal 
validation logic that I'm considering adding to SCD2 microbatch reconciliation. 
   
   Internal validation here means we are asserting on properties that we expect 
to be true if our implementation is correct, and all of the invariants we 
expect to be true actually hold. 
   
   There's a real argument to be made for not having such validation. If the 
validation throws here, it indicates either the implementation is wrong OR the 
user has gone out of their way to mutate the aux/target in a way that is 
invalid with the invariants required by the algorithm. But we cannot 
distinguish between the two cases, so there isn't really any meaningful action 
users can take if this throws, other than trying a full-refresh and hoping the 
issue was transient. 
   
   There also is _some_ runtime penalty for executing this validation, although 
I'd argue it's likely negligible. The filter operation here is lazy, so we 
shouldn't be incurring an additional full dataframe scan.
   
   From an implementation hygiene perspective, there's also an argument that 
invariants should be asserted and contractualized in tests and not in 
execution. While I buy in to that logic, the SCD2 algorithm is sophisticated 
enough where we cannot reasonably expect test all meaningfully different input 
sets that stress-test the assumed invariants. At best we can have the 
random-data fuzz test, but that is still non-deterministic random sampling.
   
   Given all of this, reasons why I would argue for keeping this 
runtime-validation:
   - Although the exception is non-actionable for users, it's an extra guard to 
prevent us from accidentally corrupting user data if the implementation is 
incorrect or invariants are violated. At best the algorithm would throw at a 
later point anyway, at worst we corrupt the target/aux with incorrect data 
silently
   - It's not super expensive
   - The validation logic is self-documenting in a meaningful way. We can write 
as many comments as we want to try to document implicit invariants, but readers 
will have no way of knowing whether the comments themselves are still accurate, 
and readers will have to go through the full reasoning process by reading 
through multiple documented invariants across this algorithm. In contrast, 
post-validation, a reader can now genuinely trust the asserted properties, 
since the implementation would have thrown otherwise before that point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to