AnishMahto opened a new pull request, #56311: URL: https://github.com/apache/spark/pull/56311
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7 -------- This is a stacked PR. Review incremental diff here: https://github.com/AnishMahto/spark/compare/SPARK-57152-SCD2-find-affected-rows...SPARK-57222-SCD2-decompose-affected-rows -------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. 8. If you want to add or modify an error type or message, please read the guideline first in 'common/utils/src/main/resources/error/README.md'. --> ### What changes were proposed in this pull request? **Preamble:** The SCD type 2 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD2 replication semantics. SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches. **Decompose affected rows:** Given the set of affected rows in the current microbatch execution - incoming rows in the microbatch, affected rows from aux table, affected rows from target table - the first step in microbatch reconciliation is decomposing closed historical rows that are being bisected by the microbatch. A closed historical row is a row in the target table that has a non-null start-at and end-at. It's possible an incoming upsert/delete in the microbatch lands with a sequence in between an existing closed row's start/end at (i.e is a late-arriving event), bisecting it. Decomposing a closed row means exactly this - bisecting the closed interval into a left and right end point, called the decomposed head and tail of the original closed row respectively. The head represents some past upsert event, the tail represents some past delete event. Once a closed row is decomposed into its end points, it can either coalesce with other endpoints/events from the full set of affected rows to form a new historical row, or it can be demoted back to the aux table as a tombstone or no-op upsert. **Drop redundant rows post-decomposition:** Decomposition transforms a single row into two synthetic children rows (decomposition head and tail), and its possible the resulting decomposition tail is made logically redundant by the incoming microbatch. It's also possible that there are duplicate events by key+sequencing, either both introduced in the same microbatch or across the incoming and a past microbatch. Post-decomposition is a good time to reconcile these duplicates, because by this point we have the maximal possible set of rows that will be merged back into the target/aux tables. We must drop redundant rows prior to reconciling the new start/end-ats for this decomposed set of microbatch-affected rows, to: 1. Prevent zero width rows (start at == end at) from ever appearing in the target table. 2. Reconciliation of a row's start/end at is fully derived from at most one other row in the decomposed set. ### Why are the changes needed? Needed for core SCD2 reconciliation logic. ### Does this PR introduce _any_ user-facing change? No, SCD2 is unreleased. ### How was this patch tested? Unit tests added to `Scd2BatchProcessorSuite`. ### Was this patch authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
