lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2445972000
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -732,7 +740,9 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
lastEpochSentOnCommit = Optional.empty();
}
- OffsetCommitRequest.Builder builder =
OffsetCommitRequest.Builder.forTopicNames(data);
+ OffsetCommitRequest.Builder builder = canUseTopicIds
+ ? OffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true)
+ : OffsetCommitRequest.Builder.forTopicNames(data);
Review Comment:
If we don't have topic IDs in hand when committing offsets, I'm afraid we
cannot simply move on and generate a request `forTopicNames` using the last
stable version. As of v10 (not stable yet but will be eventually), the
`OffsetCommit` API supports topics ID exclusively, so building with names like
this would fail, right?
https://github.com/apache/kafka/blob/92d9dc19b22529895c430bf2551b551cbfbcaad1/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java#L68-L74
This relates to the comment above: if we don't have topic IDs, we should
request metadata, but also we cannot continue to generate requests really. So,
should we just request metadata and return? not generating any request on that
round, but they will be generated on the next iteration of the background
thread after the metadata is resolved. Thoughts?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -700,15 +701,22 @@ class OffsetCommitRequestState extends
RetriableRequestState {
}
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+ Map<String, Uuid> topicIds = metadata.topicIds();
+ boolean canUseTopicIds = true;
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic>
requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ Uuid topicId = topicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID);
+ if (topicId.equals(Uuid.ZERO_UUID)) {
+ canUseTopicIds = false;
Review Comment:
we should request a metadata update in this case I expect.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -752,11 +763,21 @@ public void onResponse(final ClientResponse response) {
boolean failedRequestRegistered = false;
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic :
commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition
partition : topic.partitions()) {
- TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ // Version 10 drop topic name and support to topic id.
+ // We need to find offsetAndMetadata based on topic id and
partition index only as
+ // topic name in the response will be emtpy.
+ // For older versions, topic id is zero, and we will find
the offsetAndMetadata based on the topic name.
+ TopicPartition tp =
(!Uuid.ZERO_UUID.equals(topic.topicId()) &&
metadata.topicNames().containsKey(topic.topicId())) ?
+ new
TopicPartition(metadata.topicNames().get(topic.topicId()),
partition.partitionIndex()) :
+ new TopicPartition(topic.name(),
partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
if (error == Errors.NONE) {
OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+ if (offsetAndMetadata == null) {
+ throw new IllegalStateException("Can't find
metadata for topic id " + topic.topicId() +
Review Comment:
I'm not convinced that this is what we should do here. At this point, the
commit request succeeded on the broker. It's just that the response contains a
topic ID that the client cannot resolve, but it only needs the topic name to
include it in a `log.debug` really.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]