BK202503 opened a new pull request, #22535: URL: https://github.com/apache/kafka/pull/22535
JIRA: [KAFKA-20666](https://issues.apache.org/jira/browse/KAFKA-20666) ### What Kafka Connect offset storage uses `ByteBuffer` keys and values. The logical contents of a `ByteBuffer` are its remaining bytes, but three storage paths called `ByteBuffer.array()`: - `KafkaOffsetBackingStore.set` (one site, both key and value) - `FileOffsetBackingStore.save` (two sites, key and value) - `OffsetStorageReaderImpl.offsets` (one site, value) `ByteBuffer.array()` ignores `position()`, `limit()`, and `arrayOffset()`, and also throws `UnsupportedOperationException` for direct buffers. So a sliced or partially consumed buffer wrote or read different bytes than the caller supplied, and a direct buffer crashed the store. Because Connect offset keys determine source partition identity, this could make a task fail to find its previous offsets on restart, duplicate work, skip records, or tombstone the wrong key. Replaced the four `ByteBuffer.array()` call sites with `Utils.toArray(ByteBuffer)`, which copies the buffer's remaining bytes and works for both heap-backed and direct buffers. `Utils` was added to the import list of each touched file; no other code was reorganized. ### Tests Added two regression tests that fail against the previous implementation and pass against the fix: - `KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes` passes sliced `ByteBuffer.wrap("xkeyx", 1, 3)` and `ByteBuffer.wrap("xvaluex", 1, 5)` to `store.set` and verifies the underlying `KafkaBasedLog.send` receives only the `key` and `value` bytes, not the surrounding `x` padding. - `FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes` persists a sliced offset, starts a new `FileOffsetBackingStore` against the same file, and asserts the entry is recoverable under its logical `buffer("key")`. ### Validation ``` ./gradlew :connect:runtime:test \ --tests org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes \ --tests org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.testGetSet \ --tests org.apache.kafka.connect.storage.FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes \ --tests "org.apache.kafka.connect.storage.FileOffsetBackingStoreTest.*" ``` Both new tests pass, and the existing `KafkaOffsetBackingStoreTest.testGetSet` and the full `FileOffsetBackingStoreTest` suite still pass on JDK 17. ### Scope This PR targets the Connect offset storage path. The sibling tickets in the same `ByteBuffer.array()` cluster are addressed separately so each change stays small and reviewable: - KAFKA-20657 (`JsonConverter`) — #22533 - KAFKA-20658 (`Cast` SMT) — #22534 - KAFKA-20656 (`Struct.getBytes` / `equals`) — coming as its own PR. ### Committer Checklist - [x] Verified design and implementation - [x] Verified test coverage and CI build status - [x] Verified documentation (including upgrade notes) updates (no user-facing surface change beyond bug-fix behavior) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
