zikangh commented on code in PR #55636:
URL: https://github.com/apache/spark/pull/55636#discussion_r3175462078


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -280,6 +280,26 @@ object UnsupportedOperationChecker extends Logging {
       case _ =>
     }
 
+    // The streaming Change Data Capture (CDC) row-level post-processing 
rewrite in
+    // [[ResolveChangelogTable.addStreamingRowLevelPostProcessing]] injects a 
streaming
+    // Aggregate buffering input rows into the helper column
+    // `ResolveChangelogTable.HelperColumn.Events` ("__spark_cdc_events") 
before
+    // re-emitting them via `Generate(Inline(...))`. The rewrite is designed 
and
+    // validated only for Append output mode -- under Update or Complete the 
Aggregate
+    // would re-emit per-batch state changes or the full result table per batch
+    // respectively, neither of which matches batch CDC semantics. Reject 
those modes
+    // at analysis time with a clear error rather than silently producing a 
misleading
+    // change feed.
+    if (outputMode != InternalOutputModes.Append &&
+        aggregates.exists(a => a.aggregateExpressions.exists {
+          case ne: NamedExpression if ne.resolved =>
+            ne.name == ResolveChangelogTable.HelperColumn.Events

Review Comment:
   The existing rules check a marker that lives in the node's metadata, see 
EventTimeWatermark.delayKey, SessionWindow.marker. Let's do the same here to 
make it more robust? 



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala:
##########


Review Comment:
   The test infra here does not effectively break up data into micro-batches, 
so a few central claims (e.g. multi-batch correctness) in this PR are not being 
tested. 



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala:
##########
@@ -662,4 +663,233 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
     }
     assert(e.getMessage.contains("changes"))
   }
+
+  // ---------- Streaming: row-level post-processing ----------
+  //
+  // Streaming row-level passes (carry-over removal, update detection) rewrite 
the plan
+  // into Aggregate(rowId, _commit_version, _commit_timestamp) -> [Filter] ->
+  // Generate(Inline(events)) -> [relabel Project], under an 
EventTimeWatermark on
+  // _commit_timestamp.
+
+  /** Schema variant for post-processing tests: includes `row_commit_version`. 
*/
+  private def recreateWithRowVersion(): Identifier = {
+    val id = ident
+    val cat = catalog
+    if (cat.tableExists(id)) cat.dropTable(id)
+    cat.createTable(
+      id,
+      Array(
+        Column.create("id", LongType, false),
+        Column.create("data", StringType),
+        Column.create("row_commit_version", LongType, false)),
+      Array.empty,
+      new util.HashMap[String, String]())
+    cat.clearChangeRows(id)
+    id
+  }
+
+  /** Row constructor for the row-version-enabled schema. */
+  private def ppRow(
+      id: Long,
+      data: String,
+      rcv: Long,
+      changeType: String,
+      commitVersion: Long,
+      commitTimestampMicros: Long): InternalRow = {
+    InternalRow(
+      id,
+      UTF8String.fromString(data),
+      rcv,
+      UTF8String.fromString(changeType),
+      commitVersion,
+      commitTimestampMicros)
+  }
+
+  test("streaming carry-over removal drops CoW pairs") {
+    val id = recreateWithRowVersion()
+    catalog.setChangelogProperties(id, ChangelogProperties(
+      containsCarryoverRows = true,
+      rowIdNames = Seq("id"),
+      rowVersionName = Some("row_commit_version")))
+
+    catalog.addChangeRows(id, Seq(
+      // v1: insert Alice (rcv=1), Bob (rcv=1)
+      ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
+      ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
+      // v2: real delete Alice + carry-over for Bob (rcv unchanged)
+      ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
+      ppRow(2L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
+      ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 2L, 2000000L)))
+
+    val q = spark.readStream
+      .option("startingVersion", "1")
+      .changes(fullTableName)
+      .select("id", "data", "_change_type", "_commit_version")
+      .writeStream
+      .format("memory")
+      .queryName("cdc_stream_carryover")
+      .outputMode("append")
+      .start()
+    try {
+      q.processAllAvailable()
+      // The next micro-batch advances the input watermark to the max 
_commit_timestamp
+      // seen in the previous batch; append-mode aggregate eviction (eventTime 
<= watermark)
+      // then emits all groups including the highest commit. v1 inserts + 
Alice's real
+      // delete survive; Bob's carry-over pair at v2 is dropped.
+      checkAnswer(
+        spark.sql("SELECT * FROM cdc_stream_carryover"),
+        Seq(
+          Row(1L, "Alice", CHANGE_TYPE_INSERT, 1L),
+          Row(2L, "Bob", CHANGE_TYPE_INSERT, 1L),
+          Row(1L, "Alice", CHANGE_TYPE_DELETE, 2L)))
+    } finally {
+      q.stop()
+    }
+  }
+
+  test("streaming update detection relabels delete+insert as update") {

Review Comment:
   Batch E2E seems to cover much more test cases, should we at least cover 
those for streaming? 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -197,6 +219,255 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
     removeHelperColumns(modifiedPlan)
   }
 
+  /**
+   * Streaming counterpart of [[addRowLevelPostProcessing]].
+   *
+   * ==Why a different shape from the batch path?==
+   *
+   * The batch rewrite is Window-based:
+   * {{{
+   *   DataSourceV2Relation
+   *     -> Window partitioned by (rowId..., _commit_version)
+   *     -> [Filter (carry-over)]
+   *     -> [Project (update relabel)]
+   *     -> Project (drop helper columns)
+   * }}}
+   * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]] 
rejects
+   * `Window` on streaming queries 
(`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+   * Replacing it with a plain [[Aggregate]] is not enough on its own: an 
aggregate
+   * collapses each group to a single row, losing the per-input rows we still 
need to
+   * relabel/filter; and an append-mode streaming aggregate without an 
event-time
+   * watermark on a grouping key is itself rejected by the checker.
+   *
+   * ==The rewritten plan==
+   *
+   * Two adjustments over the naive substitution: (a) inject an 
[[EventTimeWatermark]]
+   * on `_commit_timestamp` (zero delay) so the aggregate is legal in append 
mode, and
+   * (b) buffer every input row of a group as `Inline`-able structs and 
re-explode after
+   * the aggregate so no rows are lost.
+   * {{{
+   *   DataSourceV2Relation
+   *     -> Filter (RaiseError on NULL _commit_timestamp)
+   *     -> EventTimeWatermark(_commit_timestamp, 0s)
+   *     -> Aggregate
+   *          group by (rowId..., _commit_version, _commit_timestamp)
+   *          aggs    : _del_cnt, _ins_cnt
+   *                    [, _min_rv, _max_rv, _rv_cnt  (carry-over removal 
only)]
+   *                    , __spark_cdc_events = collect_list(struct(*))
+   *     -> [Filter (carry-over: _del_cnt=1 AND _ins_cnt=1
+   *                             AND _rv_cnt=2 AND _min_rv=_max_rv)]
+   *     -> Generate(Inline(__spark_cdc_events))   // re-emit one row per 
buffered input
+   *     -> [Project (update relabel)]
+   *     -> Project (drop helper columns)
+   *     -> Project (strip internal EventTimeWatermark metadata)
+   * }}}
+   *
+   * ==Runtime walkthrough==
+   *
+   * Append-mode streaming aggregates emit a group when its event-time 
grouping key
+   * falls at or below the global watermark (eviction predicate `eventTime <= 
watermark`,
+   * applied at the start of the next micro-batch). Suppose three commits with
+   * `_commit_timestamp` 10, 20, 30 each arrive in their own micro-batch:
+   * {{{
+   *   batch  max _ts seen  watermark after batch  groups emitted by this batch
+   *   -----  ------------  ---------------------  ----------------------------
+   *     1         10                10            <none>
+   *     2         20                20            groups with 
_commit_timestamp == 10
+   *     3         30                30            groups with 
_commit_timestamp == 20
+   *   end-of-stream final flush                   groups with 
_commit_timestamp == 30
+   * }}}
+   * Because every row of a single commit shares the same `_commit_timestamp` 
(CDC
+   * contract), advancing past commit T releases every group whose grouping
+   * `_commit_timestamp` equals T -- one commit's worth of post-processed 
output per
+   * micro-batch, with the final commit flushed on stream termination.
+   *
+   * ==Per-operator detail==
+   *
+   *  0. [[Filter]] guarding against NULL `_commit_timestamp` -- raises
+   *     `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any row that
+   *     violates the contract. A NULL would never satisfy the downstream 
Aggregate's
+   *     `eventTime <= watermark` eviction predicate (NULL is silent in MAX, 
never
+   *     compares less-than-or-equal), so its group would be held in state 
forever.
+   *     Failing fast surfaces the connector bug instead of producing no 
output.
+   *  1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) -- 
required so the
+   *     downstream stateful aggregate can emit groups in append output mode. 
By CDC
+   *     contract every row in a single commit shares `_commit_timestamp`, so 
taking it
+   *     as event time is safe.
+   *  2. [[Aggregate]] keyed by `(rowId..., _commit_version, 
_commit_timestamp)`. Computes
+   *     the same `_del_cnt` / `_ins_cnt` / (`_min_rv` / `_max_rv` / 
`_rv_cnt`) helpers as
+   *     the batch path, plus an `__spark_cdc_events` array-of-struct 
buffering every
+   *     input row of the group. `_commit_timestamp` is included in the 
grouping keys
+   *     (besides being a no-op given the contract) to satisfy
+   *     
[[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]'s
+   *     requirement that the watermark attribute appear among grouping 
expressions for
+   *     append-mode streaming aggregations.
+   *  3. [[Filter]] (only when carry-over removal is requested) on the same 
predicate as
+   *     the batch path -- groups with `_del_cnt = 1 AND _ins_cnt = 1 AND 
_rv_cnt = 2 AND
+   *     _min_rv = _max_rv` are dropped wholesale.
+   *  4. [[Generate]] using `Inline(events)` to re-emit one output row per 
buffered input
+   *     row. `unrequiredChildIndex` drops the duplicate grouping columns and 
the events
+   *     buffer; the helper count columns flow through.
+   *  5. [[Project]] (only when update detection is requested) applying the 
same
+   *     
`CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION`
+   *     guard and `_change_type` relabel as the batch path.
+   *  6. [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` 
helpers so
+   *     the output schema matches the connector's declared schema.
+   *  7. Final [[Project]] (via [[stripCommitTimestampWatermarkMetadata]]) 
clears the
+   *     `EventTimeWatermark.delayKey` from the user-visible 
`_commit_timestamp`
+   *     attribute so a downstream user-supplied `withWatermark` on a 
different column
+   *     does not interact with our internal watermark via the global 
multi-watermark
+   *     policy.
+   */
+  private def addStreamingRowLevelPostProcessing(
+      plan: LogicalPlan,
+      cl: Changelog,
+      requiresCarryOverRemoval: Boolean,
+      requiresUpdateDetection: Boolean): LogicalPlan = {
+    // Fail fast on a NULL `_commit_timestamp`. The downstream Aggregate uses 
it as
+    // both an event-time watermark column and a grouping key; a NULL 
group-key value
+    // would never satisfy the `eventTime <= watermark` eviction predicate, so 
the
+    // group would silently stall (held in state until end of stream). Mirrors 
the
+    // runtime check in [[CdcNetChangesStatefulProcessor]] -- fail fast at the
+    // contract violation rather than producing no output.
+    val plan1 = addNullCommitTimestampGuard(plan)
+    val rawCommitTsAttr = getAttribute(plan1, "_commit_timestamp")
+    val watermarked = EventTimeWatermark(
+      UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), 
plan1)
+
+    val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](
+      cl.rowId().toSeq, watermarked)
+    val commitVersionAttr = getAttribute(watermarked, "_commit_version")
+    // Pick up the post-watermark `_commit_timestamp` attribute -- it carries 
the
+    // EventTimeWatermark.delayKey metadata that UnsupportedOperationChecker 
scans for.
+    val commitTimestampAttr = getAttribute(watermarked, "_commit_timestamp")
+    val changeTypeAttr = getAttribute(watermarked, "_change_type")
+
+    val groupingExprs: Seq[Expression] =
+      rowIdExprs ++ Seq(commitVersionAttr, commitTimestampAttr)
+    val groupingNamedExprs: Seq[NamedExpression] =
+      groupingExprs.map(_.asInstanceOf[NamedExpression])
+
+    val insertIf = If(EqualTo(changeTypeAttr, 
Literal(Changelog.CHANGE_TYPE_INSERT)),
+      Literal(1), Literal(null, IntegerType))
+    val deleteIf = If(EqualTo(changeTypeAttr, 
Literal(Changelog.CHANGE_TYPE_DELETE)),
+      Literal(1), Literal(null, IntegerType))
+    val delCntAlias = Alias(
+      Count(Seq(deleteIf)).toAggregateExpression(), HelperColumn.DelCnt)()
+    val insCntAlias = Alias(
+      Count(Seq(insertIf)).toAggregateExpression(), HelperColumn.InsCnt)()
+
+    val rvAliases = if (requiresCarryOverRemoval) {
+      val rowVersionExpr = V2ExpressionUtils.resolveRef[NamedExpression](
+        cl.rowVersion(), watermarked)
+      Seq(
+        Alias(Min(rowVersionExpr).toAggregateExpression(), 
HelperColumn.MinRv)(),
+        Alias(Max(rowVersionExpr).toAggregateExpression(), 
HelperColumn.MaxRv)(),
+        Alias(Count(Seq(rowVersionExpr)).toAggregateExpression(), 
HelperColumn.RvCnt)())
+    } else Seq.empty
+
+    // Buffer every input row as a struct so Inline can re-emit them after the 
aggregate.
+    // The grouping-key columns (rowId..., `_commit_version`, 
`_commit_timestamp`) appear
+    // both inside the struct and as top-level grouping outputs; the top-level 
duplicates
+    // are dropped via `unrequiredChildIndex` below.
+    val structOfAllCols = CreateStruct(watermarked.output)
+    val eventsAlias = Alias(
+      new CollectList(structOfAllCols).toAggregateExpression(), 
HelperColumn.Events)()
+
+    val aggregateExprs: Seq[NamedExpression] =
+      groupingNamedExprs ++ Seq(delCntAlias, insCntAlias) ++ rvAliases :+ 
eventsAlias
+    val aggregated = Aggregate(groupingExprs, aggregateExprs, watermarked)
+
+    val filtered: LogicalPlan = if (requiresCarryOverRemoval) {
+      val delCnt = getAttribute(aggregated, HelperColumn.DelCnt)
+      val insCnt = getAttribute(aggregated, HelperColumn.InsCnt)
+      val minRv = getAttribute(aggregated, HelperColumn.MinRv)
+      val maxRv = getAttribute(aggregated, HelperColumn.MaxRv)
+      val rvCnt = getAttribute(aggregated, HelperColumn.RvCnt)
+      val isCarryoverPair = And(

Review Comment:
   This duplicates isCarryoverPair for batch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to