gengliangwang commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3176005290


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -600,6 +615,114 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
     filteredAndRelabeledPlan
   }
 
+  /**
+   * Streaming counterpart of [[injectNetChangeComputation]]. The batch 
version uses a
+   * Catalyst `Window` partitioned by `rowId`, which is rejected on streaming 
queries.
+   * This version delegates the per-`rowId` first/last extraction and the SPIP 
collapse
+   * matrix to a [[CdcNetChangesStatefulProcessor]] driven by 
`transformWithState`:
+   *
+   *  1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) so the 
global query
+   *     watermark advances with each batch. When this rewrite runs on top of 
the row-level
+   *     post-processing rewrite (combined `containsCarryoverRows` /
+   *     `representsUpdateAsDeleteAndInsert` + `containsIntermediateChanges` 
path), the
+   *     row-level rewrite has already injected an identical 
`EventTimeWatermark` and we
+   *     reuse it instead of stacking a second one. Stacking watermarks on the 
same column
+   *     fails the multi-watermark check unless 
`STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set,
+   *     and even then it would just produce two redundant nodes.
+   *  2. [[Project]] that aliases each rowId expression to a top-level helper 
column. This
+   *     lets us address the rowId as an `Attribute` for the 
`transformWithState` grouping,
+   *     which in turn makes nested rowId paths (e.g. `payload.id`) work 
without special
+   *     casing.
+   *  3. [[TransformWithState]] keyed by the rowId helper attributes, in
+   *     [[org.apache.spark.sql.catalyst.plans.logical.EventTime]] mode. The 
processor
+   *     buffers the first and last event per row identity; an event-time 
timer set to the
+   *     latest observed `_commit_timestamp` fires once the global watermark 
advances past
+   *     it, at which point the processor evaluates the SPIP `(existedBefore, 
existsAfter)`
+   *     matrix and emits 0, 1, or 2 output rows.
+   *  4. [[SerializeFromObject]] (added by the `transformWithState` factory) 
brings the
+   *     processor's `Row` outputs back into a regular tabular shape.
+   *  5. Final [[Project]] drops the rowId helper columns so the user-visible 
schema
+   *     matches the connector's declared changelog schema.
+   *
+   * Streaming netChanges is incremental, not range-scoped: per-row-identity 
state is
+   * cleared on emission, so a later commit on the same identity starts a 
fresh window
+   * and produces additional output rows. Batch netChanges over the same 
version range
+   * would have collapsed those changes; streaming cannot retract 
already-emitted rows
+   * to match that. End-of-stream flushes all pending timers, so a bounded 
stream's
+   * output matches batch only when no row identity is touched again after its 
first
+   * emission.
+   */
+  private def addStreamingNetChangeComputation(
+      plan: LogicalPlan,
+      cl: Changelog,
+      computeUpdates: Boolean): LogicalPlan = {
+    // 1. Inject (or reuse, if already injected by the row-level rewrite) a 
watermark on
+    //    `_commit_timestamp`. The row-level rewrite already adds one with 
zero delay, so
+    //    we only add it when no watermark is present in the lineage to avoid 
stacking
+    //    EventTimeWatermark nodes (which is rejected by the multi-watermark 
check
+    //    unless STATEFUL_OPERATOR_ALLOW_MULTIPLE is set).
+    val needsWatermark = !plan.exists {
+      case _: EventTimeWatermark => true
+      case _ => false
+    }
+    val watermarked: LogicalPlan = if (needsWatermark) {
+      val rawCommitTsAttr = getAttribute(plan, "_commit_timestamp")
+      EventTimeWatermark(
+        UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), 
plan)
+    } else plan
+
+    // 2. Resolve rowId expressions against the watermarked plan. Resolving 
here (after
+    //    any preceding row-level rewrite) ensures the attribute ExprIds match 
the
+    //    columns in `plan.output` -- name-based resolution recovers them by 
their
+    //    connector-declared names. Then project them to top-level helper 
columns so
+    //    they can be referenced as plain Attributes by `transformWithState`'s 
grouping.
+    val rowIdExprs =
+      V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, 
watermarked)
+    val rowIdHelpers: Seq[Alias] = rowIdExprs.zipWithIndex.map { case (expr, 
idx) =>
+      Alias(expr, NetChangesHelperColumns.rowIdColumn(idx))()
+    }
+    val originalCols: Seq[Attribute] = watermarked.output
+    val withHelpers = Project(originalCols ++ rowIdHelpers, watermarked)
+
+    // 3. Build the input/output Row encoder for the processor. The schema is 
the
+    //    watermarked plan's schema plus the rowId helper columns.
+    val processorInputSchema = StructType(
+      withHelpers.output.map { a =>
+        StructField(a.name, a.dataType, a.nullable, a.metadata)
+      })
+    val rowEncoder = ExpressionEncoder(processorInputSchema)
+    val groupingAttrs: Seq[Attribute] = rowIdHelpers.map(_.toAttribute)
+    val keyEncoder = ExpressionEncoder(StructType(rowIdHelpers.map { a =>
+      StructField(a.name, a.dataType, a.nullable, a.metadata)
+    }))
+
+    val processor = new CdcNetChangesStatefulProcessor(processorInputSchema, 
computeUpdates)
+
+    val tws = new TransformWithState(
+      keyDeserializer = UnresolvedDeserializer(keyEncoder.deserializer, 
groupingAttrs),
+      valueDeserializer = UnresolvedDeserializer(rowEncoder.deserializer, 
withHelpers.output),
+      groupingAttributes = groupingAttrs,
+      dataAttributes = withHelpers.output,
+      statefulProcessor = processor.asInstanceOf[StatefulProcessor[Any, Any, 
Any]],
+      timeMode = EventTime,
+      outputMode = OutputMode.Append(),
+      keyEncoder = keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
+      outputObjAttr = CatalystSerde.generateObjAttr(rowEncoder),
+      child = withHelpers,
+      hasInitialState = false,
+      initialStateGroupingAttrs = groupingAttrs,
+      initialStateDataAttrs = withHelpers.output,
+      initialStateDeserializer = 
UnresolvedDeserializer(keyEncoder.deserializer, groupingAttrs),
+      initialState = LocalRelation(keyEncoder.schema))
+
+    // 4. Wrap with SerializeFromObject so the obj column becomes regular 
tabular output.
+    val serialized = CatalystSerde.serialize(tws)(rowEncoder)
+
+    // 5. Drop the rowId helper columns so the final output matches the 
connector's schema.
+    val helperNames = rowIdHelpers.map(_.name).toSet
+    Project(serialized.output.filterNot(a => helperNames.contains(a.name)), 
serialized)

Review Comment:
   Good catch -- applied `stripCommitTimestampWatermarkMetadata` at the end of 
`addStreamingNetChangeComputation`. The auto-injected 
`EventTimeWatermark.delayKey` flows through the `transformWithState` encoder 
roundtrip (the encoder schema carries `StructField` metadata), so without an 
explicit strip the user-visible `_commit_timestamp` would still carry it on the 
netChanges-only path. The strip happens before the helper-column drop, 
mirroring the row-level rewrite's call at the end of 
`addStreamingRowLevelPostProcessing`.
   
   Added a netChanges-only plan-shape assertion in 
`ResolveChangelogTableStreamingPostProcessingSuite` 
(`assertNoWatermarkMetadataOnOutput(analyzed)` in the existing "netChanges 
alone injects watermark + TransformWithState" test) so a regression on either 
the strip or the netChanges path's encoder-metadata behavior would be caught 
immediately.
   
   Pushed in 521565c0335.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connector.catalog.Changelog
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * StatefulProcessor that incrementalises CDC net-change computation for 
streaming reads.
+ *
+ * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a 
Catalyst
+ * `Window` partitioned by `rowId` and ordered by `(_commit_version, 
change_type_rank)` to
+ * extract the first and last events per row identity, then applies the SPIP 
collapse
+ * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on 
streaming
+ * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ *
+ * This processor reproduces the same semantics with `transformWithState`. 
Per-row-identity

Review Comment:
   Done -- narrowed the wording to "reuses the same SPIP collapse matrix with 
`transformWithState`, applied per watermark window rather than over the full 
requested version range", and added a forward reference to the 
per-window-vs-range-scoped paragraph below. The class-level Scaladoc no longer 
claims same-semantics with the batch `Window`-based path.
   
   Pushed in 521565c0335.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to