lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2457216746
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -700,15 +701,22 @@ class OffsetCommitRequestState extends
RetriableRequestState {
}
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+ Map<String, Uuid> topicIds = metadata.topicIds();
+ boolean canUseTopicIds = true;
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic>
requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ Uuid topicId = topicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID);
+ if (topicId.equals(Uuid.ZERO_UUID)) {
+ canUseTopicIds = false;
Review Comment:
requesting a metadata update here would only flip a flag (so that the
periodic metadata updates know that a request is actually needed). So multiple
calls to this would just flip that flag again (not send another request). I was
thinking something similar to this
https://github.com/apache/kafka/blob/6d16f687aa1a0df26f2f665436b7efaf0aec0c56/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java#L161-L166
Interesting point still, this path of committing and not having topic IDs,
could only happen if we're committing manually assigned partitions really (if
the consumer subscribed, it will have the topic IDs already, it's what we get
on the assignment from the coordinator)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]