gengliangwang commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3176006195


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -600,6 +615,114 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
     filteredAndRelabeledPlan
   }
 
+  /**
+   * Streaming counterpart of [[injectNetChangeComputation]]. The batch 
version uses a
+   * Catalyst `Window` partitioned by `rowId`, which is rejected on streaming 
queries.
+   * This version delegates the per-`rowId` first/last extraction and the SPIP 
collapse
+   * matrix to a [[CdcNetChangesStatefulProcessor]] driven by 
`transformWithState`:
+   *
+   *  1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) so the 
global query
+   *     watermark advances with each batch. When this rewrite runs on top of 
the row-level
+   *     post-processing rewrite (combined `containsCarryoverRows` /
+   *     `representsUpdateAsDeleteAndInsert` + `containsIntermediateChanges` 
path), the
+   *     row-level rewrite has already injected an identical 
`EventTimeWatermark` and we
+   *     reuse it instead of stacking a second one. Stacking watermarks on the 
same column
+   *     fails the multi-watermark check unless 
`STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set,
+   *     and even then it would just produce two redundant nodes.
+   *  2. [[Project]] that aliases each rowId expression to a top-level helper 
column. This
+   *     lets us address the rowId as an `Attribute` for the 
`transformWithState` grouping,
+   *     which in turn makes nested rowId paths (e.g. `payload.id`) work 
without special
+   *     casing.
+   *  3. [[TransformWithState]] keyed by the rowId helper attributes, in
+   *     [[org.apache.spark.sql.catalyst.plans.logical.EventTime]] mode. The 
processor
+   *     buffers the first and last event per row identity; an event-time 
timer set to the
+   *     latest observed `_commit_timestamp` fires once the global watermark 
advances past
+   *     it, at which point the processor evaluates the SPIP `(existedBefore, 
existsAfter)`
+   *     matrix and emits 0, 1, or 2 output rows.
+   *  4. [[SerializeFromObject]] (added by the `transformWithState` factory) 
brings the
+   *     processor's `Row` outputs back into a regular tabular shape.
+   *  5. Final [[Project]] drops the rowId helper columns so the user-visible 
schema
+   *     matches the connector's declared changelog schema.
+   *
+   * Streaming netChanges is incremental, not range-scoped: per-row-identity 
state is
+   * cleared on emission, so a later commit on the same identity starts a 
fresh window
+   * and produces additional output rows. Batch netChanges over the same 
version range
+   * would have collapsed those changes; streaming cannot retract 
already-emitted rows
+   * to match that. End-of-stream flushes all pending timers, so a bounded 
stream's
+   * output matches batch only when no row identity is touched again after its 
first
+   * emission.
+   */
+  private def addStreamingNetChangeComputation(

Review Comment:
   Good catch -- the netChanges-only path was indeed slipping past the existing 
CDC output-mode guard.
   
   What I found: `UnsupportedOperationChecker.checkForStreaming` had an 
explicit guard at line 293-301 that rejects non-Append output mode for the 
row-level rewrite by detecting the `__spark_cdc_events` aggregate. Complete 
mode on a netChanges-only stream gets rejected by the generic 
`InternalOutputModes.Complete if aggregates.isEmpty` check (line 276-278) since 
the netChanges-only plan has no streaming `Aggregate`. **But Update mode on a 
netChanges-only stream was passing analysis** -- the injected 
`TransformWithState` produces append output internally, but there's no 
per-operator output-mode compatibility check for `TransformWithState` (unlike 
`FlatMapGroupsWithState` and `applyInPandasWithState` which do have one).
   
   Fix: extended the existing CDC guard to also detect `TransformWithState` 
driven by `CdcNetChangesStatefulProcessor`:
   
   ```scala
   val containsCdcEventsAggregate = aggregates.exists(...) // existing
   val containsCdcNetChangesProcessor = plan.exists {
     case t: TransformWithState if t.isStreaming &&
       t.statefulProcessor.isInstanceOf[CdcNetChangesStatefulProcessor] => true
     case _ => false
   }
   if (outputMode \!= Append && (containsCdcEventsAggregate || 
containsCdcNetChangesProcessor)) {
     throw 
QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError(
       outputMode, "Change Data Capture (CDC) streaming reads with 
post-processing")
   }
   ```
   
   Both modes now raise `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION` with the 
existing CDC-specific message, matching the row-level path's behaviour.
   
   Added two end-to-end tests in `ChangelogEndToEndSuite` mirroring the 
row-level rejection tests:
   - `streaming netChanges with update output mode is rejected`
   - `streaming netChanges with complete output mode is rejected`
   
   Pushed in 521565c0335.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to