TirumalaSrividya opened a new pull request, #22248:
URL: https://github.com/apache/kafka/pull/22248

   ## Problem
   
   MirrorMaker 2 silently skipped offsets in two failure scenarios:
   
   1. **Log Truncation (Data Loss):** When MM2 was paused and the source 
   topic's retention policy purged messages, MM2 would resume from its 
   last committed offset and silently skip the missing records. The gap 
   went undetected, leaving the standby cluster permanently out of sync.
   
   2. **Topic Reset:** When a source topic was deleted and recreated 
   (resetting offsets to 0), MM2 would continue from its previously 
   committed offset, skipping all records on the new topic from 0 to 
   the committed offset.
   
   Both scenarios were also undetectable after a full MM2 restart because 
   the in-memory offset tracker was lost.
   
   ## Changes
   
   ### New file
   - `DataLossException.java` — typed exception extending `ConnectException`, 
     thrown on data loss detection as required by the spec
   
   ### MirrorSourceTask.java
   - Changed `consumer` field and test constructor from `KafkaConsumer` to 
     `Consumer` interface for testability with `MockConsumer`
   - Added `checkOffsetAnomaly()` — detects data loss (forward gap) and 
     topic reset (backward jump) on every polled record
   - Added `handleOffsetOutOfRange()` — secondary safety net when Kafka 
     throws `OffsetOutOfRangeException`
   - Added startup gap check in `initializeConsumer()` — compares committed 
     offset against broker's earliest available offset at startup, catching 
     data loss that occurred while MM2 was down
   - Extracted detection logic into separate methods to satisfy Checkstyle 
     cyclomatic complexity limit (max 16)
   
   ### MirrorSourceTaskTest.java
   - `testDataLossDetectedAtStartup()` — verifies gap detection when broker 
     earliest offset is ahead of committed offset
   - `testTopicResetDoesNotDropOtherPartitions()` — verifies topic reset 
     recovery seeks to beginning without dropping other partition assignments
   - `testOffsetOutOfRangeThrowsDataLossException()` — verifies 
     `DataLossException` is thrown via the `OffsetOutOfRangeException` path
   
   ## Behaviour
   
   | Scenario | Before | After |
   |---|---|---|
   | Log truncation during replication | Silent skip | `DataLossException` 
thrown, MM2 halts |
   | Log truncation after MM2 restart | Undetected | `DataLossException` thrown 
at startup |
   | Topic deleted and recreated | Silent skip | `TOPIC RESET DETECTED`, seeks 
to beginning |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to