gengliangwang commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3172131326
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -197,6 +231,188 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
removeHelperColumns(modifiedPlan)
}
+ /**
+ * Streaming counterpart of [[addRowLevelPostProcessing]].
+ *
+ * ==Why a different shape from the batch path?==
+ *
+ * The batch rewrite is Window-based:
+ * {{{
+ * DataSourceV2Relation
+ * -> Window partitioned by (rowId..., _commit_version)
+ * -> [Filter (carry-over)]
+ * -> [Project (update relabel)]
+ * -> Project (drop helper columns)
+ * }}}
+ * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]
rejects
+ * `Window` on streaming queries
(`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ * Replacing it with a plain [[Aggregate]] is not enough on its own: an
aggregate
+ * collapses each group to a single row, losing the per-input rows we still
need to
+ * relabel/filter; and an append-mode streaming aggregate without an
event-time
+ * watermark on a grouping key is itself rejected by the checker.
+ *
+ * ==The rewritten plan==
+ *
+ * Two adjustments over the naive substitution: (a) inject an
[[EventTimeWatermark]]
+ * on `_commit_timestamp` (zero delay) so the aggregate is legal in append
mode, and
+ * (b) buffer every input row of a group as `Inline`-able structs and
re-explode after
+ * the aggregate so no rows are lost.
+ * {{{
+ * DataSourceV2Relation
+ * -> EventTimeWatermark(_commit_timestamp, 0s)
+ * -> Aggregate
+ * group by (rowId..., _commit_version, _commit_timestamp)
+ * aggs : _del_cnt, _ins_cnt
+ * [, _min_rv, _max_rv, _rv_cnt (carry-over removal
only)]
+ * , __spark_cdc_events = collect_list(struct(*))
+ * -> [Filter (carry-over: _del_cnt=1 AND _ins_cnt=1
+ * AND _rv_cnt=2 AND _min_rv=_max_rv)]
+ * -> Generate(Inline(__spark_cdc_events)) // re-emit one row per
buffered input
+ * -> [Project (update relabel)]
+ * -> Project (drop helper columns)
+ * }}}
+ *
+ * ==Runtime walkthrough==
+ *
+ * Append-mode streaming aggregates emit a group when its event-time
grouping key
+ * falls at or below the global watermark (eviction predicate `eventTime <=
watermark`,
+ * applied at the start of the next micro-batch). Suppose three commits with
+ * `_commit_timestamp` 10, 20, 30 each arrive in their own micro-batch:
+ * {{{
+ * batch max _ts seen watermark after batch groups emitted by this batch
+ * ----- ------------ --------------------- ----------------------------
+ * 1 10 10 <none>
+ * 2 20 20 groups with
_commit_timestamp == 10
+ * 3 30 30 groups with
_commit_timestamp == 20
+ * end-of-stream final flush groups with
_commit_timestamp == 30
+ * }}}
+ * Because every row of a single commit shares the same `_commit_timestamp`
(CDC
+ * contract), advancing past commit T releases every group whose grouping
+ * `_commit_timestamp` equals T -- one commit's worth of post-processed
output per
+ * micro-batch, with the final commit flushed on stream termination.
+ *
+ * ==Per-operator detail==
+ *
+ * 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) --
required so the
+ * downstream stateful aggregate can emit groups in append output mode.
By CDC
+ * contract every row in a single commit shares `_commit_timestamp`, so
taking it
+ * as event time is safe. Note: this is currently the only analyzer rule
that
+ * auto-injects an [[EventTimeWatermark]] (others resolve user-supplied
watermarks).
+ * The watermark metadata is preserved on the user-visible
`_commit_timestamp`
+ * output (since [[Generate]]'s `generatorOutput` copies attribute
metadata), so a
+ * downstream user-supplied `withWatermark` on a different column will
interact
+ * with this internal watermark under the global multi-watermark policy.
+ * 2. [[Aggregate]] keyed by `(rowId..., _commit_version,
_commit_timestamp)`. Computes
+ * the same `_del_cnt` / `_ins_cnt` / (`_min_rv` / `_max_rv` /
`_rv_cnt`) helpers as
+ * the batch path, plus an `__spark_cdc_events` array-of-struct
buffering every
+ * input row of the group. `_commit_timestamp` is included in the
grouping keys
+ * (besides being a no-op given the contract) to satisfy
+ *
[[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]'s
+ * requirement that the watermark attribute appear among grouping
expressions for
+ * append-mode streaming aggregations.
+ * 3. [[Filter]] (only when carry-over removal is requested) on the same
predicate as
+ * the batch path -- groups with `_del_cnt = 1 AND _ins_cnt = 1 AND
_rv_cnt = 2 AND
+ * _min_rv = _max_rv` are dropped wholesale.
+ * 4. [[Generate]] using `Inline(events)` to re-emit one output row per
buffered input
+ * row. `unrequiredChildIndex` drops the duplicate grouping columns and
the events
+ * buffer; the helper count columns flow through.
+ * 5. [[Project]] (only when update detection is requested) applying the
same
+ *
`CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION`
+ * guard and `_change_type` relabel as the batch path.
+ * 6. Final [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*`
helpers so
+ * the output schema matches the connector's declared schema.
+ */
+ private def addStreamingRowLevelPostProcessing(
Review Comment:
Good call -- added a matching analyzer-level guard for the row-level path.
The streaming row-level rewrite now starts with a `Filter` that raises
`CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` (via `RaiseError`) on any
row with a NULL `_commit_timestamp`, mirroring the runtime guard in
`CdcNetChangesStatefulProcessor`.
A NULL `_commit_timestamp` would silently stall the row-level path (the
downstream Aggregate uses `_commit_timestamp` as both the watermark column and
a grouping key, and the `eventTime <= watermark` eviction predicate is never
satisfied for a NULL key, so the group sits in state forever producing no
output and no error). Failing fast at the analyzer level surfaces the contract
violation immediately.
Lives on the row-level PR (#55636) since the method being reviewed is in
that PR; this branch picks it up via rebase. Plan-shape and end-to-end tests
added on #55636 too. Pushed in
https://github.com/apache/spark/commit/791d5ce3246 (row-level PR), now visible
on this PR after the rebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]