AnishMahto opened a new pull request, #56283:
URL: https://github.com/apache/spark/pull/56283
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline first
for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
8. If you want to add or modify an error type or message, please read the
guideline first in
'common/utils/src/main/resources/error/README.md'.
-->
### What changes were proposed in this pull request?
**Preamble**:
The SCD type 2 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD2 replication semantics.
SCD2 flows also maintain an "auxiliary" table to keep track of
early-arriving out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**Find Affected Aux/Target Table Rows**
After preprocessing the microbatch such that we have each incoming row's
startAt, endAt, and recordStartAt projected, the next step in reconciliation is
determining which existing rows in the auxiliary and target tables either might
be affected by the incoming rows or they might affect the incoming rows
themselves.
A no-op upsert run row in the auxiliary table can be affected by the
microbatch if an incoming row makes the row no longer a no-op (i.e microbatch
delivers an interleaving row that does indeed change history tracked columns).
A tombstone in the auxiliary table can affect an incoming row if it now matches
against an upsert in the microbatch.
A row in the target table can be affected by the microbatch if an incoming
upsert makes the target table's row a no-op upsert, or an incoming
delete/upsert event terminates an existing row in the target table. An active
row (endAt=null) in the target table could become terminated, or an existing
closed row in the target table could become bisected. Conversely, existing rows
in the target table can dictate when an incoming upsert row should be
considered closed from.
We take a practical, conservative approach in selecting the set of rows that
could possibly be affected or affect the microbatch. Per key we retrieve all
existing rows whose startAt comes after the youngest sequence in the incoming
microbatch, as well as the first existing row in both the target/aux that comes
before the youngest sequence.
This is opposed to doing a very complex and expensive join to determine
which rows are definitively affected by/affecting the microbatch. In practice
its not common for events to actually receive very old events out of order, so
pulling in all existing rows that come after the oldest row in the microbatch
will generally be a very small result set.
### Why are the changes needed?
AutoCDC SCD2 core algorithm.
### Does this PR introduce _any_ user-facing change?
No, new feature.
### How was this patch tested?
Unit tested in `Scd2BatchProcessorSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]