TirumalaSrividya opened a new pull request, #22248:
URL: https://github.com/apache/kafka/pull/22248
## Problem
MirrorMaker 2 silently skipped offsets in two failure scenarios:
1. **Log Truncation (Data Loss):** When MM2 was paused and the source
topic's retention policy purged messages, MM2 would resume from its
last committed offset and silently skip the missing records. The gap
went undetected, leaving the standby cluster permanently out of sync.
2. **Topic Reset:** When a source topic was deleted and recreated
(resetting offsets to 0), MM2 would continue from its previously
committed offset, skipping all records on the new topic from 0 to
the committed offset.
Both scenarios were also undetectable after a full MM2 restart because
the in-memory offset tracker was lost.
## Changes
### New file
- `DataLossException.java` — typed exception extending `ConnectException`,
thrown on data loss detection as required by the spec
### MirrorSourceTask.java
- Changed `consumer` field and test constructor from `KafkaConsumer` to
`Consumer` interface for testability with `MockConsumer`
- Added `checkOffsetAnomaly()` — detects data loss (forward gap) and
topic reset (backward jump) on every polled record
- Added `handleOffsetOutOfRange()` — secondary safety net when Kafka
throws `OffsetOutOfRangeException`
- Added startup gap check in `initializeConsumer()` — compares committed
offset against broker's earliest available offset at startup, catching
data loss that occurred while MM2 was down
- Extracted detection logic into separate methods to satisfy Checkstyle
cyclomatic complexity limit (max 16)
### MirrorSourceTaskTest.java
- `testDataLossDetectedAtStartup()` — verifies gap detection when broker
earliest offset is ahead of committed offset
- `testTopicResetDoesNotDropOtherPartitions()` — verifies topic reset
recovery seeks to beginning without dropping other partition assignments
- `testOffsetOutOfRangeThrowsDataLossException()` — verifies
`DataLossException` is thrown via the `OffsetOutOfRangeException` path
## Behaviour
| Scenario | Before | After |
|---|---|---|
| Log truncation during replication | Silent skip | `DataLossException`
thrown, MM2 halts |
| Log truncation after MM2 restart | Undetected | `DataLossException` thrown
at startup |
| Topic deleted and recreated | Silent skip | `TOPIC RESET DETECTED`, seeks
to beginning |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]