gengliangwang opened a new pull request, #55653:
URL: https://github.com/apache/spark/pull/55653
### What changes were proposed in this pull request?
Follow-up to #55636 addressing four post-merge review comments from @zikangh:
1. **Deduplicate `isCarryoverPair`.** The carry-over predicate (`_del_cnt =
1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`) was duplicated
between the batch path's `addCarryOverPairFilter` and the streaming path's
inline filter. Extracted a shared `buildCarryOverPairPredicate` helper and call
it from both.
2. **Mark the streaming row-level rewrite via attribute metadata rather than
helper column name.** `UnsupportedOperationChecker` previously detected the
rewrite by string-matching the `__spark_cdc_events` aggregate alias name.
Switched to a metadata marker
(`ResolveChangelogTable.streamingPostProcessingMarker`) attached to the alias's
output attribute -- mirroring the existing `EventTimeWatermark.delayKey` and
`SessionWindow.marker` patterns. The marker travels with the attribute through
optimization.
3. **Make `InMemoryChangelogMicroBatchStream` actually emit one micro-batch
per commit.** The previous implementation reported `latestOffset = rows.size -
1` so every test ran in a single micro-batch, leaving the multi-batch
correctness claims of #55636 (watermark progression, append-mode aggregate
eviction across batches, commit-by-commit emission) untested. Now walks
`_commit_version` to compute commit boundaries and advances one boundary per
`latestOffset()` call. End-to-end output is unchanged for tests that
`processAllAvailable`.
4. **Expand streaming E2E coverage to mirror the batch suite.** New tests:
- composite rowId carry-over removal,
- composite rowId update detection (different tuples kept raw),
- multi-commit pipeline (carry-over + update detection across 4 commits),
- DELETE-all-rows and UPDATE-all-rows fixtures,
- append-only workload pass-through,
- no-op UPDATE labeled as update (rcv differs on pre/post),
- large carry-over removal (9 carry-over pairs + 1 real delete).
These run on the new commit-boundary-batched stream and so verify
multi-batch semantics, not just end-of-input flushes.
### Why are the changes needed?
@zikangh raised these on the merged PR. Each is independent and small;
bundled here so they can be reviewed and shipped together rather than scattered
across multiple follow-ups.
### Does this PR introduce _any_ user-facing change?
No. Internal refactor (#1, #2), test infrastructure improvement (#3), and
additional test coverage (#4). The behavior of streaming CDC reads is unchanged.
### How was this patch tested?
100 tests pass across the four CDC suites (was 92, +8 new streaming E2E):
- `ChangelogResolutionSuite`
- `ResolveChangelogTablePostProcessingSuite`
- `ResolveChangelogTableStreamingPostProcessingSuite`
- `ChangelogEndToEndSuite`
Also confirmed:
- `UnsupportedOperationsSuite` (216 tests) still passes after the
marker-based detection switch.
- `dev/lint-scala` (scalastyle + scalafmt) is clean on the changed files.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]