gengliangwang commented on code in PR #55636:
URL: https://github.com/apache/spark/pull/55636#discussion_r3174334651
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -197,6 +216,252 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
removeHelperColumns(modifiedPlan)
}
+ /**
+ * Streaming counterpart of [[addRowLevelPostProcessing]].
+ *
+ * ==Why a different shape from the batch path?==
+ *
+ * The batch rewrite is Window-based:
+ * {{{
+ * DataSourceV2Relation
+ * -> Window partitioned by (rowId..., _commit_version)
+ * -> [Filter (carry-over)]
+ * -> [Project (update relabel)]
+ * -> Project (drop helper columns)
+ * }}}
+ * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]
rejects
+ * `Window` on streaming queries
(`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ * Replacing it with a plain [[Aggregate]] is not enough on its own: an
aggregate
+ * collapses each group to a single row, losing the per-input rows we still
need to
+ * relabel/filter; and an append-mode streaming aggregate without an
event-time
+ * watermark on a grouping key is itself rejected by the checker.
+ *
+ * ==The rewritten plan==
+ *
+ * Two adjustments over the naive substitution: (a) inject an
[[EventTimeWatermark]]
+ * on `_commit_timestamp` (zero delay) so the aggregate is legal in append
mode, and
+ * (b) buffer every input row of a group as `Inline`-able structs and
re-explode after
+ * the aggregate so no rows are lost.
+ * {{{
+ * DataSourceV2Relation
+ * -> Filter (RaiseError on NULL _commit_timestamp)
+ * -> EventTimeWatermark(_commit_timestamp, 0s)
+ * -> Aggregate
+ * group by (rowId..., _commit_version, _commit_timestamp)
+ * aggs : _del_cnt, _ins_cnt
+ * [, _min_rv, _max_rv, _rv_cnt (carry-over removal
only)]
+ * , __spark_cdc_events = collect_list(struct(*))
+ * -> [Filter (carry-over: _del_cnt=1 AND _ins_cnt=1
+ * AND _rv_cnt=2 AND _min_rv=_max_rv)]
+ * -> Generate(Inline(__spark_cdc_events)) // re-emit one row per
buffered input
+ * -> [Project (update relabel)]
+ * -> Project (drop helper columns)
+ * -> Project (strip internal EventTimeWatermark metadata)
+ * }}}
+ *
+ * ==Runtime walkthrough==
+ *
+ * Append-mode streaming aggregates emit a group when its event-time
grouping key
+ * falls at or below the global watermark (eviction predicate `eventTime <=
watermark`,
+ * applied at the start of the next micro-batch). Suppose three commits with
+ * `_commit_timestamp` 10, 20, 30 each arrive in their own micro-batch:
+ * {{{
+ * batch max _ts seen watermark after batch groups emitted by this batch
+ * ----- ------------ --------------------- ----------------------------
+ * 1 10 10 <none>
+ * 2 20 20 groups with
_commit_timestamp == 10
+ * 3 30 30 groups with
_commit_timestamp == 20
+ * end-of-stream final flush groups with
_commit_timestamp == 30
+ * }}}
+ * Because every row of a single commit shares the same `_commit_timestamp`
(CDC
+ * contract), advancing past commit T releases every group whose grouping
+ * `_commit_timestamp` equals T -- one commit's worth of post-processed
output per
+ * micro-batch, with the final commit flushed on stream termination.
+ *
+ * ==Per-operator detail==
+ *
+ * 0. [[Filter]] guarding against NULL `_commit_timestamp` -- raises
+ * `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any row that
+ * violates the contract. A NULL would never satisfy the downstream
Aggregate's
+ * `eventTime <= watermark` eviction predicate (NULL is silent in MAX,
never
+ * compares less-than-or-equal), so its group would be held in state
forever.
+ * Failing fast surfaces the connector bug instead of producing no
output.
+ * 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) --
required so the
+ * downstream stateful aggregate can emit groups in append output mode.
By CDC
+ * contract every row in a single commit shares `_commit_timestamp`, so
taking it
+ * as event time is safe.
+ * 2. [[Aggregate]] keyed by `(rowId..., _commit_version,
_commit_timestamp)`. Computes
+ * the same `_del_cnt` / `_ins_cnt` / (`_min_rv` / `_max_rv` /
`_rv_cnt`) helpers as
+ * the batch path, plus an `__spark_cdc_events` array-of-struct
buffering every
+ * input row of the group. `_commit_timestamp` is included in the
grouping keys
+ * (besides being a no-op given the contract) to satisfy
+ *
[[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]'s
+ * requirement that the watermark attribute appear among grouping
expressions for
+ * append-mode streaming aggregations.
+ * 3. [[Filter]] (only when carry-over removal is requested) on the same
predicate as
+ * the batch path -- groups with `_del_cnt = 1 AND _ins_cnt = 1 AND
_rv_cnt = 2 AND
+ * _min_rv = _max_rv` are dropped wholesale.
+ * 4. [[Generate]] using `Inline(events)` to re-emit one output row per
buffered input
+ * row. `unrequiredChildIndex` drops the duplicate grouping columns and
the events
+ * buffer; the helper count columns flow through.
+ * 5. [[Project]] (only when update detection is requested) applying the
same
+ *
`CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION`
+ * guard and `_change_type` relabel as the batch path.
+ * 6. [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*`
helpers so
+ * the output schema matches the connector's declared schema.
+ * 7. Final [[Project]] (via [[stripCommitTimestampWatermarkMetadata]])
clears the
+ * `EventTimeWatermark.delayKey` from the user-visible
`_commit_timestamp`
+ * attribute so a downstream user-supplied `withWatermark` on a
different column
+ * does not interact with our internal watermark via the global
multi-watermark
+ * policy.
+ */
+ private def addStreamingRowLevelPostProcessing(
+ plan: LogicalPlan,
+ cl: Changelog,
+ requiresCarryOverRemoval: Boolean,
+ requiresUpdateDetection: Boolean): LogicalPlan = {
+ // Fail fast on a NULL `_commit_timestamp`. The downstream Aggregate uses
it as
+ // both an event-time watermark column and a grouping key; a NULL
group-key value
+ // would never satisfy the `eventTime <= watermark` eviction predicate, so
the
+ // group would silently stall (held in state until end of stream). Mirrors
the
+ // runtime check in [[CdcNetChangesStatefulProcessor]] -- fail fast at the
+ // contract violation rather than producing no output.
+ val plan1 = addNullCommitTimestampGuard(plan)
+ val rawCommitTsAttr = getAttribute(plan1, "_commit_timestamp")
+ val watermarked = EventTimeWatermark(
+ UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L),
plan1)
+
+ val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](
+ cl.rowId().toSeq, watermarked)
+ val commitVersionAttr = getAttribute(watermarked, "_commit_version")
+ // Pick up the post-watermark `_commit_timestamp` attribute -- it carries
the
+ // EventTimeWatermark.delayKey metadata that UnsupportedOperationChecker
scans for.
+ val commitTimestampAttr = getAttribute(watermarked, "_commit_timestamp")
+ val changeTypeAttr = getAttribute(watermarked, "_change_type")
+
+ val groupingExprs: Seq[Expression] =
+ rowIdExprs ++ Seq(commitVersionAttr, commitTimestampAttr)
+ val groupingNamedExprs: Seq[NamedExpression] =
+ groupingExprs.map(_.asInstanceOf[NamedExpression])
+
+ val insertIf = If(EqualTo(changeTypeAttr,
Literal(Changelog.CHANGE_TYPE_INSERT)),
+ Literal(1), Literal(null, IntegerType))
+ val deleteIf = If(EqualTo(changeTypeAttr,
Literal(Changelog.CHANGE_TYPE_DELETE)),
+ Literal(1), Literal(null, IntegerType))
+ val delCntAlias = Alias(
+ Count(Seq(deleteIf)).toAggregateExpression(), HelperColumn.DelCnt)()
+ val insCntAlias = Alias(
+ Count(Seq(insertIf)).toAggregateExpression(), HelperColumn.InsCnt)()
+
+ val rvAliases = if (requiresCarryOverRemoval) {
+ val rowVersionExpr = V2ExpressionUtils.resolveRef[NamedExpression](
+ cl.rowVersion(), watermarked)
+ Seq(
+ Alias(Min(rowVersionExpr).toAggregateExpression(),
HelperColumn.MinRv)(),
+ Alias(Max(rowVersionExpr).toAggregateExpression(),
HelperColumn.MaxRv)(),
+ Alias(Count(Seq(rowVersionExpr)).toAggregateExpression(),
HelperColumn.RvCnt)())
+ } else Seq.empty
+
+ // Buffer every input row as a struct so Inline can re-emit them after the
aggregate.
+ // The grouping-key columns (rowId..., `_commit_version`,
`_commit_timestamp`) appear
+ // both inside the struct and as top-level grouping outputs; the top-level
duplicates
+ // are dropped via `unrequiredChildIndex` below.
+ val structOfAllCols = CreateStruct(watermarked.output)
+ val eventsAlias = Alias(
+ new CollectList(structOfAllCols).toAggregateExpression(),
HelperColumn.Events)()
+
+ val aggregateExprs: Seq[NamedExpression] =
+ groupingNamedExprs ++ Seq(delCntAlias, insCntAlias) ++ rvAliases :+
eventsAlias
+ val aggregated = Aggregate(groupingExprs, aggregateExprs, watermarked)
+
+ val filtered: LogicalPlan = if (requiresCarryOverRemoval) {
+ val delCnt = getAttribute(aggregated, HelperColumn.DelCnt)
+ val insCnt = getAttribute(aggregated, HelperColumn.InsCnt)
+ val minRv = getAttribute(aggregated, HelperColumn.MinRv)
+ val maxRv = getAttribute(aggregated, HelperColumn.MaxRv)
+ val rvCnt = getAttribute(aggregated, HelperColumn.RvCnt)
+ val isCarryoverPair = And(
+ And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))),
+ And(EqualTo(rvCnt, Literal(2L)), EqualTo(minRv, maxRv)))
+ Filter(Not(isCarryoverPair), aggregated)
+ } else aggregated
+
+ // Inline the struct array back into rows. Drop the events column
(consumed by Inline)
+ // and the grouping-key columns (re-emitted from inside the struct) so the
final shape
+ // matches the connector's schema plus the surviving helper count columns.
+ val eventsAttr = getAttribute(filtered, HelperColumn.Events)
+ val groupingAttrSet = AttributeSet(groupingNamedExprs.map(_.toAttribute))
+ val unrequiredChildIndex: Seq[Int] = filtered.output.zipWithIndex.collect {
+ case (a, i) if a.exprId == eventsAttr.exprId => i
+ case (a, i) if groupingAttrSet.contains(a) => i
+ }
+ val generatorOutput: Seq[Attribute] = watermarked.output.map { col =>
+ AttributeReference(col.name, col.dataType, col.nullable, col.metadata)()
+ }
+ val generated = Generate(
+ Inline(eventsAttr),
+ unrequiredChildIndex = unrequiredChildIndex,
+ outer = false,
+ qualifier = None,
+ generatorOutput = generatorOutput,
+ child = filtered)
+
+ val withRelabel: LogicalPlan = if (requiresUpdateDetection) {
+ addUpdateRelabelProjection(generated)
+ } else generated
+
+ // Strip the auto-injected EventTimeWatermark metadata from the
user-visible
+ // `_commit_timestamp` so it does not interact with downstream
user-supplied
+ // watermarks via the global multi-watermark policy. The metadata flows
through
+ // Generate(Inline) (which copies attribute metadata) and the relabel
Project, so
+ // it must be cleared here at the boundary of the rewrite.
+ val cleaned = stripCommitTimestampWatermarkMetadata(withRelabel)
+ removeHelperColumns(cleaned)
+ }
+
+ /**
+ * Adds a `Filter` that raises
+ * `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any input row
whose
+ * `_commit_timestamp` is `NULL`. Used as the first step of the streaming
row-level
+ * rewrite so a contract-violating connector fails fast instead of silently
stalling
+ * the downstream stateful aggregate's group.
+ */
+ private def addNullCommitTimestampGuard(input: LogicalPlan): LogicalPlan = {
+ val commitTsAttr = getAttribute(input, "_commit_timestamp")
+ val raise = RaiseError(
+ Literal("CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP"),
+ CreateMap(Nil),
+ BooleanType)
+ // CaseWhen returns the default branch (true) for non-null timestamps and
+ // evaluates the side-effecting RaiseError for nulls; the row never passes
the
+ // filter on a contract violation.
+ val checkExpr = CaseWhen(Seq(IsNull(commitTsAttr) -> raise), Literal(true))
Review Comment:
Good catch -- you're right that the previous `Filter(CaseWhen(IsNull(c) ->
raise, true))` would be eliminated by `NullPropagation` for a non-nullable
child (`IsNull(c) if !c.nullable -> Literal(false)` at
`expressions.scala:920`). Same for `AssertNotNull` (line 926). The test catalog
declared `_commit_timestamp` nullable so the existing test wouldn't have caught
a regression here.
Fixed in e8db78a -- replaced the predicate with a new
`CdcAssertCommitTimestampNotNull` catalyst expression. Because the rule isn't
keyed on a generic shape but on specific case classes, `NullPropagation`
doesn't simplify it, so the runtime check stays in place regardless of declared
nullability.
Added a plan-shape test that runs `NullPropagation` directly on both
`IsNull(c)` (sanity: gets folded to a `Literal` for a non-nullable column) and
on the new expression (asserted to remain unchanged), with a non-nullable
`_commit_timestamp` configured via a new `commitTimestampNullable` knob on
`ChangelogProperties`.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java:
##########
@@ -35,8 +35,34 @@
* {@code update_preimage}, or {@code update_postimage}</li>
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the
version containing
* this change</li>
- * <li>{@code _commit_timestamp} (TIMESTAMP) — the timestamp of the
commit</li>
+ * <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit.
All rows
+ * belonging to a single {@code _commit_version} must share the same
+ * {@code _commit_timestamp}. For streaming reads with post-processing
enabled,
+ * two additional requirements apply:
+ * <ol>
+ * <li>All rows of a single commit must appear in the same micro-batch
(i.e.
+ * micro-batch boundaries align with commit boundaries).</li>
+ * <li>Distinct {@code _commit_version} values must have distinct
+ * {@code _commit_timestamp} values.</li>
+ * </ol>
+ * Streaming post-processing uses {@code _commit_timestamp} as event
time with a
+ * zero-delay watermark, so once a micro-batch observes max event time T
the
+ * global watermark advances to T. Both Spark's late-event filter and its
+ * state-eviction predicate then use {@code eventTime <= T} -- so any
later row
+ * at exactly {@code _commit_timestamp = T} (whether from the same
commit split
+ * across batches, or from a different commit that happens to share T) is
+ * silently dropped as late. Requirement 1 rules out the same-commit
case;
+ * requirement 2 rules out the different-commit case. Atomic-commit CDC
connectors
+ * (e.g. Delta versions, Iceberg snapshots) that derive {@code
_commit_timestamp}
+ * from wall-clock time at commit time naturally satisfy both
requirements.
+ * Behavior is undefined if {@code _commit_timestamp} is {@code NULL} on
any row
Review Comment:
Done in e8db78a -- replaced the "behavior is undefined" wording with an
explicit statement that the row-level rewrite raises
`CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` on a NULL
`_commit_timestamp`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]