gengliangwang commented on code in PR #55636:
URL: https://github.com/apache/spark/pull/55636#discussion_r3171766665


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -197,6 +216,188 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
     removeHelperColumns(modifiedPlan)
   }
 
+  /**
+   * Streaming counterpart of [[addRowLevelPostProcessing]].
+   *
+   * ==Why a different shape from the batch path?==
+   *
+   * The batch rewrite is Window-based:
+   * {{{
+   *   DataSourceV2Relation
+   *     -> Window partitioned by (rowId..., _commit_version)
+   *     -> [Filter (carry-over)]
+   *     -> [Project (update relabel)]
+   *     -> Project (drop helper columns)
+   * }}}
+   * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]] 
rejects
+   * `Window` on streaming queries 
(`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+   * Replacing it with a plain [[Aggregate]] is not enough on its own: an 
aggregate
+   * collapses each group to a single row, losing the per-input rows we still 
need to
+   * relabel/filter; and an append-mode streaming aggregate without an 
event-time
+   * watermark on a grouping key is itself rejected by the checker.
+   *
+   * ==The rewritten plan==
+   *
+   * Two adjustments over the naive substitution: (a) inject an 
[[EventTimeWatermark]]
+   * on `_commit_timestamp` (zero delay) so the aggregate is legal in append 
mode, and
+   * (b) buffer every input row of a group as `Inline`-able structs and 
re-explode after
+   * the aggregate so no rows are lost.
+   * {{{
+   *   DataSourceV2Relation
+   *     -> EventTimeWatermark(_commit_timestamp, 0s)
+   *     -> Aggregate
+   *          group by (rowId..., _commit_version, _commit_timestamp)
+   *          aggs    : _del_cnt, _ins_cnt
+   *                    [, _min_rv, _max_rv, _rv_cnt  (carry-over removal 
only)]
+   *                    , __spark_cdc_events = collect_list(struct(*))
+   *     -> [Filter (carry-over: _del_cnt=1 AND _ins_cnt=1
+   *                             AND _rv_cnt=2 AND _min_rv=_max_rv)]
+   *     -> Generate(Inline(__spark_cdc_events))   // re-emit one row per 
buffered input
+   *     -> [Project (update relabel)]
+   *     -> Project (drop helper columns)
+   * }}}
+   *
+   * ==Runtime walkthrough==
+   *
+   * Append-mode streaming aggregates emit a group when its event-time 
grouping key
+   * falls at or below the global watermark (eviction predicate `eventTime <= 
watermark`,
+   * applied at the start of the next micro-batch). Suppose three commits with
+   * `_commit_timestamp` 10, 20, 30 each arrive in their own micro-batch:
+   * {{{
+   *   batch  max _ts seen  watermark after batch  groups emitted by this batch
+   *   -----  ------------  ---------------------  ----------------------------
+   *     1         10                10            <none>
+   *     2         20                20            groups with 
_commit_timestamp == 10
+   *     3         30                30            groups with 
_commit_timestamp == 20
+   *   end-of-stream final flush                   groups with 
_commit_timestamp == 30
+   * }}}
+   * Because every row of a single commit shares the same `_commit_timestamp` 
(CDC
+   * contract), advancing past commit T releases every group whose grouping
+   * `_commit_timestamp` equals T -- one commit's worth of post-processed 
output per
+   * micro-batch, with the final commit flushed on stream termination.
+   *
+   * ==Per-operator detail==
+   *
+   *  1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) -- 
required so the
+   *     downstream stateful aggregate can emit groups in append output mode. 
By CDC
+   *     contract every row in a single commit shares `_commit_timestamp`, so 
taking it
+   *     as event time is safe. Note: this is currently the only analyzer rule 
that
+   *     auto-injects an [[EventTimeWatermark]] (others resolve user-supplied 
watermarks).
+   *     The watermark metadata is preserved on the user-visible 
`_commit_timestamp`

Review Comment:
   Right, fixed in dee5e84. Added a final Project at the boundary of the 
streaming rewrite (`stripCommitTimestampWatermarkMetadata`) that recreates 
`_commit_timestamp` with `EventTimeWatermark.delayKey` removed from its 
metadata. The watermark is preserved internally on the Aggregate's grouping 
attribute (so the rewrite still works), but no longer leaks to the user-visible 
output, so a downstream `withWatermark` on a different column won't interact 
with our auto-injected watermark via the global multi-watermark policy. New 
plan-shape test: "watermark metadata is stripped from user-visible 
_commit_timestamp".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to