dajac opened a new pull request, #22443:
URL: https://github.com/apache/kafka/pull/22443
This patch lets `TxnOffsetCommit` requests negotiate v6+ (KIP-1319)
when the producer's metadata cache knows the topic ids for every
topic in the offsets map.
Key changes:
- `TransactionManager` gains a `Metadata` reference (constructor
parameter wired from `KafkaProducer`). It is used to look up topic
ids when building a `TxnOffsetCommit` request.
- `txnOffsetCommitHandler` makes a single pass over the offsets map:
it populates `pendingTxnOffsetCommits`, builds each request topic
(via `computeIfAbsent` over a name -> topic map, with the topic id
resolved from `metadata.topicIds()`), and accumulates a snapshot
`Map<Uuid, String> topicNamesByIds` for the response handler. When
every topic has a non-zero id, the builder is chosen via
`forTopicIdsOrNames(...)` so v6 can be negotiated; otherwise it
falls back to `forTopicNames(...)` and caps at v5.
- The response handler resolves the topic name from the snapshot
(not the live metadata cache) when a v6+ response topic omits the
name on the wire. If the id is unknown to the snapshot, the
handler logs a warning and skips that topic so the retry loop is
unaffected.
Test changes:
- `prepareTxnOffsetCommitResponse` in `TransactionManagerTest` now
asserts that the inspected request is at version < 6 and that
every request topic carries a non-empty name. This strengthens the
existing v5-path coverage.
- Three new tests cover the v6 path. They seed the metadata cache
with a known topic id via `RequestTestUtils.metadataUpdateWithIds`
and then assert the request's negotiated version, topic ids, and
error handling:
`testTxnOffsetCommitNegotiatesV6WhenAllTopicIdsAreAvailable`,
`testTxnOffsetCommitDowngradesToV5WhenAnyTopicIdIsMissing`, and
`testTxnOffsetCommitRetriesOnUnknownTopicIdAtV6`.
`KafkaProducer.sendOffsetsToTransaction` will be updated in a
follow-up to refresh the metadata cache before delegating, so the
v6 negotiation actually fires for topics that the producer has not
seen yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]