skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663417944
########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ########## @@ -68,36 +74,117 @@ public Builder(String groupId, } this.data = new OffsetFetchRequestData() - .setGroupId(groupId) - .setRequireStable(requireStable) - .setTopics(topics); + .setGroupId(groupId) + .setRequireStable(requireStable) + .setTopics(topics); this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; } boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } - @Override - public OffsetFetchRequest build(short version) { - if (isAllTopicPartitions() && version < 2) { - throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + - "v" + version + ", but we need v2 or newer to request all topic partitions."); - } + public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap, + boolean requireStable, + boolean throwOnFetchStableOffsetsUnsupported) { + super(ApiKeys.OFFSET_FETCH); - if (data.requireStable() && version < 7) { - if (throwOnFetchStableOffsetsUnsupported) { - throw new UnsupportedVersionException("Broker unexpectedly " + - "doesn't support requireStable flag on version " + version); + List<OffsetFetchRequestGroup> groups = new ArrayList<>(); + for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) { + final List<OffsetFetchRequestTopics> topics; + if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) { + Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap = + new HashMap<>(); + for (TopicPartition topicPartition : groupIdToTopicPartitionMap.get(entry.getKey())) { Review comment: Good point on storing it in the local variable, I will do that. But I think we still do want `entry.getKey()` in this situation, not `entry.getValue()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org