AnishMahto opened a new pull request, #56311:
URL: https://github.com/apache/spark/pull/56311

   Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
   
   --------
   
   This is a stacked PR. Review incremental diff here: 
https://github.com/AnishMahto/spark/compare/SPARK-57152-SCD2-find-affected-rows...SPARK-57222-SCD2-decompose-affected-rows
   
   --------
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
     7. If you want to add a new configuration, please read the guideline first 
for naming configurations in
        
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the 
guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   **Preamble:**
   
   The SCD type 2 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD2 replication semantics.
   
   SCD2 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
   
   **Decompose affected rows:**
   
   Given the set of affected rows in the current microbatch execution - 
incoming rows in the microbatch, affected rows from aux table, affected rows 
from target table - the first step in microbatch reconciliation is decomposing 
closed historical rows that are being bisected by the microbatch.
   
   A closed historical row is a row in the target table that has a non-null 
start-at and end-at. It's possible an incoming upsert/delete in the microbatch 
lands with a sequence in between an existing closed row's start/end at (i.e is 
a late-arriving event), bisecting it.
   
   Decomposing a closed row means exactly this - bisecting the closed interval 
into a left and right end point, called the decomposed head and tail of the 
original closed row respectively. The head represents some past upsert event, 
the tail represents some past delete event. 
   
   Once a closed row is decomposed into its end points, it can either coalesce 
with other endpoints/events from the full set of affected rows to form a new 
historical row, or it can be demoted back to the aux table as a tombstone or 
no-op upsert.
   
   **Drop redundant rows post-decomposition:**
   
   Decomposition transforms a single row into two synthetic children rows 
(decomposition head and tail), and its possible the resulting decomposition 
tail is made logically redundant by the incoming microbatch. 
   
   It's also possible that there are duplicate events by key+sequencing, either 
both introduced in the same microbatch or across the incoming and a past 
microbatch. Post-decomposition is a good time to reconcile these duplicates, 
because by this point we have the maximal possible set of rows that will be 
merged back into the target/aux tables.
   
   We must drop redundant rows prior to reconciling the new start/end-ats for 
this decomposed set of microbatch-affected rows, to:
   1. Prevent zero width rows (start at == end at) from ever appearing in the 
target table.
   2. Reconciliation of a row's start/end at is fully derived from at most one 
other row in the decomposed set.
   
   ### Why are the changes needed?
   Needed for core SCD2 reconciliation logic.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No, SCD2 is unreleased.
   
   
   ### How was this patch tested?
   Unit tests added to `Scd2BatchProcessorSuite`.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Co-authored with Claude Opus 4.7
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to