zikangh commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3175648372
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -600,6 +615,114 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
filteredAndRelabeledPlan
}
+ /**
+ * Streaming counterpart of [[injectNetChangeComputation]]. The batch
version uses a
+ * Catalyst `Window` partitioned by `rowId`, which is rejected on streaming
queries.
+ * This version delegates the per-`rowId` first/last extraction and the SPIP
collapse
+ * matrix to a [[CdcNetChangesStatefulProcessor]] driven by
`transformWithState`:
+ *
+ * 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) so the
global query
+ * watermark advances with each batch. When this rewrite runs on top of
the row-level
+ * post-processing rewrite (combined `containsCarryoverRows` /
+ * `representsUpdateAsDeleteAndInsert` + `containsIntermediateChanges`
path), the
+ * row-level rewrite has already injected an identical
`EventTimeWatermark` and we
+ * reuse it instead of stacking a second one. Stacking watermarks on the
same column
+ * fails the multi-watermark check unless
`STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set,
+ * and even then it would just produce two redundant nodes.
+ * 2. [[Project]] that aliases each rowId expression to a top-level helper
column. This
+ * lets us address the rowId as an `Attribute` for the
`transformWithState` grouping,
+ * which in turn makes nested rowId paths (e.g. `payload.id`) work
without special
+ * casing.
+ * 3. [[TransformWithState]] keyed by the rowId helper attributes, in
+ * [[org.apache.spark.sql.catalyst.plans.logical.EventTime]] mode. The
processor
+ * buffers the first and last event per row identity; an event-time
timer set to the
+ * latest observed `_commit_timestamp` fires once the global watermark
advances past
+ * it, at which point the processor evaluates the SPIP `(existedBefore,
existsAfter)`
+ * matrix and emits 0, 1, or 2 output rows.
+ * 4. [[SerializeFromObject]] (added by the `transformWithState` factory)
brings the
+ * processor's `Row` outputs back into a regular tabular shape.
+ * 5. Final [[Project]] drops the rowId helper columns so the user-visible
schema
+ * matches the connector's declared changelog schema.
+ *
+ * Streaming netChanges is incremental, not range-scoped: per-row-identity
state is
+ * cleared on emission, so a later commit on the same identity starts a
fresh window
+ * and produces additional output rows. Batch netChanges over the same
version range
+ * would have collapsed those changes; streaming cannot retract
already-emitted rows
+ * to match that. End-of-stream flushes all pending timers, so a bounded
stream's
+ * output matches batch only when no row identity is touched again after its
first
+ * emission.
+ */
+ private def addStreamingNetChangeComputation(
Review Comment:
Are we actually rejecting update/complete? This method produces a plan with
TransformWithState, I'm not sure if it's being matched correctly in
UnsupportedOperationChecker. Let's add a test case to be sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]