rajabhishekmaurya opened a new pull request, #22380:
URL: https://github.com/apache/kafka/pull/22380
## Summary
`MirrorSourceTask` currently relies on the source consumer's
`auto.offset.reset` policy to recover from any `OffsetOutOfRangeException`.
In production that means the two most damaging real-world failure modes are
indistinguishable from a healthy stream:
1. **Log truncation** — the source log's start offset advances past the
replication position (retention policy, `kafka-delete-records.sh`, etc.).
With `auto.offset.reset=earliest` the consumer silently jumps forward; the
target cluster ends up with a gap that no metric or log line reflects.
2. **Topic reset** — the source topic is deleted and recreated, e.g. during
maintenance. The consumer's stored position is now beyond the new
`endOffset`, so it either throws repeatedly or, again with `earliest`,
silently jumps forward.
This change makes `MirrorSourceTask` recognise both situations and react
differently.
## Changes
A new `catch (OffsetOutOfRangeException e)` branch in `poll()` dispatches to
a
new helper `handleOutOfRangeOffsets(...)`:
* If `position > endOffset && beginningOffset == 0` for an out-of-range
partition, the source topic looks newly recreated. The task logs a `WARN`
with the timestamp and topic-partition, calls `consumer.seek(tp, 0L)`, and
resumes replication on the next poll.
* If `position < beginningOffset`, data we still needed to replicate has been
purged. The task logs an `ERROR` describing the gap (positions and the
number of records lost) and throws a `ConnectException`, surfacing the
failure to Connect's task lifecycle (fail-fast) instead of papering over
it.
* Any other shape of out-of-range is treated as unexpected and fails the
task, so the implementation degrades safely rather than silently.
`initializeConsumer(...)` was also tightened: partitions with no committed
offset now `seekToBeginning(...)` explicitly. This makes the task behave
correctly when operators choose `consumer.auto.offset.reset = none` to opt
into the truncation detection above (otherwise a first-time partition would
throw `NoOffsetForPartitionException` on first poll).
Net change: one file, ~70 added / ~7 removed.
## Why the predicate is safe
The `beginningOffset == 0` condition in the reset branch is load-bearing:
ordinary retention-driven trimming always leaves `beginningOffset > 0`, so
retention can never be mistaken for a topic reset — it always falls into the
truncation branch. Conversely, a recreated topic has both
`beginningOffset == 0` *and* an `endOffset` lower than what we previously
read, so the recreation case has a clean discriminator.
## Testing
* Unit logic was exercised with a Docker Compose harness (two single-node
KRaft clusters + this build of MM2 + a small producer). Three scenarios are
run end-to-end:
1. Normal replication of 1000 records — verified by reading the end offset
of `primary.commit-log` on the target cluster.
2. Truncation — pause MM2, produce additional records on the source, run
`kafka-delete-records.sh` to lift the low watermark past MM2's last
position, unpause. Asserts that
`"Source log truncation detected"` appears in the task logs and that the
task transitions to FAILED.
3. Topic reset — stop MM2, delete and recreate the source topic, produce
fresh records, restart MM2. Asserts that
`"Source topic reset detected"` appears and that the new records arrive
on the target.
* All scenarios pass deterministically against `apache/kafka:4.0.0`.
* Existing MirrorMaker 2 integration tests are unaffected (no public API
changed; the new exception branch is the only execution path that
behaviour-shifts and it only triggers on a state vanilla MM2 currently
silently absorbs).
## Compatibility
* No public API change.
* No new configuration property.
* Behaviour change is gated on the consumer raising
`OffsetOutOfRangeException`, which only happens when the operator sets
`consumer.auto.offset.reset = none`. With the default
(`auto.offset.reset = earliest`), `MirrorSourceTask` behaves exactly as
before.
## Committer Checklist (excluded from commit message)
- [x] Verified design and implementation
- [x] Verified test coverage and CI build status
- [x] Verified documentation (including upgrade notes) — N/A, no public
surface change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]