AnishMahto opened a new pull request, #56283:
URL: https://github.com/apache/spark/pull/56283

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
     7. If you want to add a new configuration, please read the guideline first 
for naming configurations in
        
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the 
guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   **Preamble**:
   
   The SCD type 2 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD2 replication semantics.
   
   SCD2 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
   
   **Find Affected Aux/Target Table Rows**
   
   After preprocessing the microbatch such that we have each incoming row's 
startAt, endAt, and recordStartAt projected, the next step in reconciliation is 
determining which existing rows in the auxiliary and target tables either might 
be affected by the incoming rows or they might affect the incoming rows 
themselves.
   
   A no-op upsert run row in the auxiliary table can be affected by the 
microbatch if an incoming row makes the row no longer a no-op (i.e microbatch 
delivers an interleaving row that does indeed change history tracked columns). 
A tombstone in the auxiliary table can affect an incoming row if it now matches 
against an upsert in the microbatch.
   
   A row in the target table can be affected by the microbatch if an incoming 
upsert makes the target table's row a no-op upsert, or an incoming 
delete/upsert event terminates an existing row in the target table. An active 
row (endAt=null) in the target table could become terminated, or an existing 
closed row in the target table could become bisected. Conversely, existing rows 
in the target table can dictate when an incoming upsert row should be 
considered closed from.
   
   We take a practical, conservative approach in selecting the set of rows that 
could possibly be affected or affect the microbatch. Per key we retrieve all 
existing rows whose startAt comes after the youngest sequence in the incoming 
microbatch, as well as the first existing row in both the target/aux that comes 
before the youngest sequence.
   
   This is opposed to doing a very complex and expensive join to determine 
which rows are definitively affected by/affecting the microbatch. In practice 
its not common for events to actually receive very old events out of order, so 
pulling in all existing rows that come after the oldest row in the microbatch 
will generally be a very small result set.
   
   
   ### Why are the changes needed?
   AutoCDC SCD2 core algorithm.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No, new feature.
   
   
   ### How was this patch tested?
   Unit tested in `Scd2BatchProcessorSuite`.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Co-authored with Claude Opus 4.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to