rajabhishekmaurya opened a new pull request, #22380:
URL: https://github.com/apache/kafka/pull/22380

   ## Summary
   
   `MirrorSourceTask` currently relies on the source consumer's
   `auto.offset.reset` policy to recover from any `OffsetOutOfRangeException`.
   In production that means the two most damaging real-world failure modes are
   indistinguishable from a healthy stream:
   
   1. **Log truncation** — the source log's start offset advances past the
      replication position (retention policy, `kafka-delete-records.sh`, etc.).
      With `auto.offset.reset=earliest` the consumer silently jumps forward; the
      target cluster ends up with a gap that no metric or log line reflects.
   2. **Topic reset** — the source topic is deleted and recreated, e.g. during
      maintenance. The consumer's stored position is now beyond the new
      `endOffset`, so it either throws repeatedly or, again with `earliest`,
      silently jumps forward.
   
   This change makes `MirrorSourceTask` recognise both situations and react
   differently.
   
   ## Changes
   
   A new `catch (OffsetOutOfRangeException e)` branch in `poll()` dispatches to 
a
   new helper `handleOutOfRangeOffsets(...)`:
   
   * If `position > endOffset && beginningOffset == 0` for an out-of-range
     partition, the source topic looks newly recreated. The task logs a `WARN`
     with the timestamp and topic-partition, calls `consumer.seek(tp, 0L)`, and
     resumes replication on the next poll.
   * If `position < beginningOffset`, data we still needed to replicate has been
     purged. The task logs an `ERROR` describing the gap (positions and the
     number of records lost) and throws a `ConnectException`, surfacing the
     failure to Connect's task lifecycle (fail-fast) instead of papering over 
it.
   * Any other shape of out-of-range is treated as unexpected and fails the
     task, so the implementation degrades safely rather than silently.
   
   `initializeConsumer(...)` was also tightened: partitions with no committed
   offset now `seekToBeginning(...)` explicitly. This makes the task behave
   correctly when operators choose `consumer.auto.offset.reset = none` to opt
   into the truncation detection above (otherwise a first-time partition would
   throw `NoOffsetForPartitionException` on first poll).
   
   Net change: one file, ~70 added / ~7 removed.
   
   ## Why the predicate is safe
   
   The `beginningOffset == 0` condition in the reset branch is load-bearing:
   ordinary retention-driven trimming always leaves `beginningOffset > 0`, so
   retention can never be mistaken for a topic reset — it always falls into the
   truncation branch. Conversely, a recreated topic has both
   `beginningOffset == 0` *and* an `endOffset` lower than what we previously
   read, so the recreation case has a clean discriminator.
   
   ## Testing
   
   * Unit logic was exercised with a Docker Compose harness (two single-node
     KRaft clusters + this build of MM2 + a small producer). Three scenarios are
     run end-to-end:
     1. Normal replication of 1000 records — verified by reading the end offset
        of `primary.commit-log` on the target cluster.
     2. Truncation — pause MM2, produce additional records on the source, run
        `kafka-delete-records.sh` to lift the low watermark past MM2's last
        position, unpause. Asserts that
        `"Source log truncation detected"` appears in the task logs and that the
        task transitions to FAILED.
     3. Topic reset — stop MM2, delete and recreate the source topic, produce
        fresh records, restart MM2. Asserts that
        `"Source topic reset detected"` appears and that the new records arrive
        on the target.
   * All scenarios pass deterministically against `apache/kafka:4.0.0`.
   * Existing MirrorMaker 2 integration tests are unaffected (no public API
     changed; the new exception branch is the only execution path that
     behaviour-shifts and it only triggers on a state vanilla MM2 currently
     silently absorbs).
   
   ## Compatibility
   
   * No public API change.
   * No new configuration property.
   * Behaviour change is gated on the consumer raising
     `OffsetOutOfRangeException`, which only happens when the operator sets
     `consumer.auto.offset.reset = none`. With the default
     (`auto.offset.reset = earliest`), `MirrorSourceTask` behaves exactly as
     before.
   
   ## Committer Checklist (excluded from commit message)
   - [x] Verified design and implementation
   - [x] Verified test coverage and CI build status
   - [x] Verified documentation (including upgrade notes) — N/A, no public 
surface change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to