BK202503 opened a new pull request, #22535:
URL: https://github.com/apache/kafka/pull/22535

   JIRA: [KAFKA-20666](https://issues.apache.org/jira/browse/KAFKA-20666)
   
   ### What
   
   Kafka Connect offset storage uses `ByteBuffer` keys and values. The logical 
contents of a `ByteBuffer` are its remaining bytes, but three storage paths 
called `ByteBuffer.array()`:
   
   - `KafkaOffsetBackingStore.set` (one site, both key and value)
   - `FileOffsetBackingStore.save` (two sites, key and value)
   - `OffsetStorageReaderImpl.offsets` (one site, value)
   
   `ByteBuffer.array()` ignores `position()`, `limit()`, and `arrayOffset()`, 
and also throws `UnsupportedOperationException` for direct buffers. So a sliced 
or partially consumed buffer wrote or read different bytes than the caller 
supplied, and a direct buffer crashed the store.
   
   Because Connect offset keys determine source partition identity, this could 
make a task fail to find its previous offsets on restart, duplicate work, skip 
records, or tombstone the wrong key.
   
   Replaced the four `ByteBuffer.array()` call sites with 
`Utils.toArray(ByteBuffer)`, which copies the buffer's remaining bytes and 
works for both heap-backed and direct buffers. `Utils` was added to the import 
list of each touched file; no other code was reorganized.
   
   ### Tests
   
   Added two regression tests that fail against the previous implementation and 
pass against the fix:
   
   - `KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes` passes 
sliced `ByteBuffer.wrap("xkeyx", 1, 3)` and `ByteBuffer.wrap("xvaluex", 1, 5)` 
to `store.set` and verifies the underlying `KafkaBasedLog.send` receives only 
the `key` and `value` bytes, not the surrounding `x` padding.
   - `FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes` 
persists a sliced offset, starts a new `FileOffsetBackingStore` against the 
same file, and asserts the entry is recoverable under its logical 
`buffer("key")`.
   
   ### Validation
   
   ```
   ./gradlew :connect:runtime:test \
     --tests 
org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes
 \
     --tests 
org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest.testGetSet \
     --tests 
org.apache.kafka.connect.storage.FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes
 \
     --tests "org.apache.kafka.connect.storage.FileOffsetBackingStoreTest.*"
   ```
   
   Both new tests pass, and the existing 
`KafkaOffsetBackingStoreTest.testGetSet` and the full 
`FileOffsetBackingStoreTest` suite still pass on JDK 17.
   
   ### Scope
   
   This PR targets the Connect offset storage path. The sibling tickets in the 
same `ByteBuffer.array()` cluster are addressed separately so each change stays 
small and reviewable:
   
   - KAFKA-20657 (`JsonConverter`) — #22533
   - KAFKA-20658 (`Cast` SMT) — #22534
   - KAFKA-20656 (`Struct.getBytes` / `equals`) — coming as its own PR.
   
   ### Committer Checklist
   
   - [x] Verified design and implementation
   - [x] Verified test coverage and CI build status
   - [x] Verified documentation (including upgrade notes) updates (no 
user-facing surface change beyond bug-fix behavior)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to