gengliangwang opened a new pull request, #55636:
URL: https://github.com/apache/spark/pull/55636
### What changes were proposed in this pull request?
This PR implements row-level CDC post-processing (carry-over removal and
update detection) for DSv2 streaming reads. Previously, streaming `changes()`
rejected any post-processing with a blanket
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` error.
The batch path (added in #55508 and #55583) uses a Catalyst `Window` keyed
by `(rowId, _commit_version)`, which `UnsupportedOperationChecker` rejects on
streaming queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). The streaming
rewrite in `ResolveChangelogTable` now expresses the same logic with
streaming-allowed primitives:
```
EventTimeWatermark(_commit_timestamp, 0s)
-> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp)
(count_if delete/insert, [min/max/count rowVersion,]
collect_list(struct(*)))
-> [Filter on the carry-over predicate]
-> Generate(Inline(events))
-> [Project relabeling _change_type for delete+insert pairs]
-> Project dropping __spark_cdc_* helpers
```
Including `_commit_timestamp` in the grouping keys is required to satisfy
the Append-mode streaming aggregation contract (the watermark attribute must
appear among the grouping expressions). By CDC convention all rows in a single
commit share `_commit_timestamp`, so this is a no-op semantically relative to
the batch `(rowId, _commit_version)` grouping.
`deduplicationMode = netChanges` is still rejected -- net change computation
partitions by `rowId` alone and reasons over the entire requested range, which
is fundamentally cross-batch. The existing error class
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is replaced with
the more specific `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`,
which now names the offending option and points users at the supported
streaming alternatives.
Doc updates:
- `Changelog.java` clarifies that all rows of a single `_commit_version`
must share `_commit_timestamp`, and that streaming reads expect non-decreasing
`_commit_timestamp` across micro-batches.
- `Changelog.java` notes that `containsIntermediateChanges()` is
range-scoped, hence the streaming limitation for `netChanges`.
- `DataStreamReader.changes()` Scaladoc lists the `netChanges` streaming
limitation.
### Why are the changes needed?
Without this PR, any streaming CDC read against a connector that emits CoW
carry-over pairs (`containsCarryoverRows = true`) or represents updates as raw
delete+insert (`representsUpdateAsDeleteAndInsert = true`) raises an analysis
error, forcing users to fall back to batch reads. The batch-only restriction is
unnecessary for these passes -- they don't need cross-version state -- and it
surprises users since the same options work on batch reads.
### Does this PR introduce _any_ user-facing change?
Yes.
- Streaming `spark.readStream.changes(...)` now supports `computeUpdates =
true` and `deduplicationMode = dropCarryovers`. Previously these threw
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED`.
- The error class
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is renamed to
`INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` with a more specific
message. The new error fires only for `deduplicationMode = netChanges` on
streaming reads.
- `DataStreamReader.changes()` Scaladoc is updated accordingly.
- `Changelog.java` Scaladoc clarifies the `_commit_timestamp` contract for
streaming.
### How was this patch tested?
86 tests across 4 CDC suites (all passing):
- `ResolveChangelogTableStreamingPostProcessingSuite` (new, 5 tests) --
plan-shape assertions covering carry-over only, update detection only, both
fused, and the no-rewrite pass-through cases. Verifies the `EventTimeWatermark`
+ `Aggregate` + `Generate(Inline)` rewrite shape.
- `ChangelogResolutionSuite` -- the two existing streaming throw-tests are
flipped to plan-shape assertions; a new test covers the `netChanges` streaming
throw.
- `ResolveChangelogTablePostProcessingSuite` -- the existing streaming throw
test is updated to cover the `netChanges`-only case.
- `ChangelogEndToEndSuite` -- three new streaming end-to-end tests using
`InMemoryChangelogCatalog`: carry-over removal drops CoW pairs, update
detection relabels delete+insert as update, and `netChanges` throws.
Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes -- the
rewritten plan does not contain `Window` or any other streaming-rejected
operator.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]