rajinisivaram commented on a change in pull request #10954:
URL: https://github.com/apache/kafka/pull/10954#discussion_r662935021



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
##########
@@ -17,37 +17,51 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(List)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private Map<String, List<TopicPartition>> groupToTopicPartitions = new 
HashMap<>();
 
     /**
-     * Set the topic partitions to list as part of the result.
-     * {@code null} includes all topic partitions.
-     *
-     * @param topicPartitions List of topic partitions to include
+     * Default constructor for {@code ListConsumerGroupOffsetsOptions}. Sets 
the topic partitions
+     * to fetch for each group id to {@code null}, which indicates to fetch 
all offsets for all
+     * topic partitions for that group.
+     * */
+    public ListConsumerGroupOffsetsOptions(List<String> groupIds) {

Review comment:
       This is a breaking change in a public API since it removes the default 
constructor. In any case, don't really want this in the constructor, we should 
add methods for whatever we need. Actually looking at the rest of the changes 
in this class, we are repurposing an existing public API by changing all of its 
methods, we need to completely rethink this change.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
##########
@@ -17,37 +17,51 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(List)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private Map<String, List<TopicPartition>> groupToTopicPartitions = new 
HashMap<>();
 
     /**
-     * Set the topic partitions to list as part of the result.
-     * {@code null} includes all topic partitions.
-     *
-     * @param topicPartitions List of topic partitions to include
+     * Default constructor for {@code ListConsumerGroupOffsetsOptions}. Sets 
the topic partitions
+     * to fetch for each group id to {@code null}, which indicates to fetch 
all offsets for all
+     * topic partitions for that group.
+     * */
+    public ListConsumerGroupOffsetsOptions(List<String> groupIds) {
+        for (String group : groupIds) {
+            groupToTopicPartitions.put(group, null);
+        }
+    }
+
+    /**
+     * Set the topic partitions for each group we want to fetch offsets for as 
part of the result.
+     * {@code null} mapping for a specific group id means to fetch offsets for 
all topic
+     * partitions for that specific group.
+     * @param groupToTopicPartitions Map of group id to list of topic 
partitions to fetch offsets
+     *                              for.
      * @return This ListGroupOffsetsOptions
      */
-    public ListConsumerGroupOffsetsOptions 
topicPartitions(List<TopicPartition> topicPartitions) {
-        this.topicPartitions = topicPartitions;
+    public ListConsumerGroupOffsetsOptions groupToTopicPartitions(Map<String, 
List<TopicPartition>> groupToTopicPartitions) {

Review comment:
       We are using options in an inconsistent way here compared to other APIs. 
A good example to follow would be:
   ```
   public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> 
topicPartitionOffsets,
                                                             ListOffsetsOptions 
options)
   ```
   Options here are additional options that apply to the request. Data for the 
request comes from the first argument. We could do something similar for 
listConsumerGroupOffsets.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -109,6 +113,7 @@
     private AtomicBoolean asyncCommitFenced;
     private ConsumerGroupMetadata groupMetadata;
     private final boolean throwOnFetchStableOffsetsUnsupported;
+    private volatile boolean batchFetchOffsets = true;

Review comment:
       I can see this is based on the pattern used in the recent changes to 
FindCoordinator. But I don't see the point of it. If one broker that is the 
coordinator at any point in time wasn't upgraded, we switch to non-batched mode 
and stay there forever until the consumer is restarted. Given that consumers 
don't need batching, it seems to me that we could avoid any changes in 
consumers and just update `OffsetFetchRequest` to do the right thing based on 
versions. @dajac Since you reviewed the PR for FindCoordinators, what do you 
think?

##########
File path: clients/src/main/resources/common/message/OffsetFetchResponse.json
##########
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition 
level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0+", 
+    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0-7",
       "about": "The responses per topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0+",
+      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0-7",
         "about": "The responses per partition", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+        { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
           "about": "The partition index." },
-        { "name": "CommittedOffset", "type": "int64", "versions": "0+",
+        { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
           "about": "The committed message offset." },
-        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
+        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", 
"default": "-1",
           "ignorable": true, "about": "The leader epoch." },
-        { "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+        { "name": "Metadata", "type": "string", "versions": "0-7", 
"nullableVersions": "0-7",
           "about": "The partition metadata." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        { "name": "ErrorCode", "type": "int16", "versions": "0-7",
           "about": "The error code, or 0 if there was no error." }
       ]}
     ]},
-    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
-      "about": "The top-level error code, or 0 if there was no error." }
+    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
+      "about": "The top-level error code, or 0 if there was no error." },
+    {"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": 
"8+",
+      "about": "The responses per group id.", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
+        "about": "The group ID." },
+      { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": 
"8+",
+        "about": "The responses per topic.", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",

Review comment:
       Can't be 0+ for a field added in 8+?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -888,22 +888,26 @@ default ListConsumerGroupsResult listConsumerGroups() {
     }
 
     /**
-     * List the consumer group offsets available in the cluster.
-     *
+     * List the consumer group offsets available in the cluster for the given 
list of consumer
+     * groups.
+     * @param groupIds List of consumer group ids to list offsets for.
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> 
groupIds, ListConsumerGroupOffsetsOptions options);
 
     /**
      * List the consumer group offsets available in the cluster with the 
default options.
      * <p>
-     * This is a convenience method for {@link 
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with 
default options.
+     * This is a convenience method for
+     * {@link #listConsumerGroupOffsets(List, 
ListConsumerGroupOffsetsOptions)} with
+     * default options.
      *
+     * @param groupIds List of consumer group ids to list offsets for.
      * @return The ListGroupOffsetsResult.
      */
-    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId) {
-        return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
+    default ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(List<String> groupIds) {

Review comment:
       We need to keep the old method as before and deprecate.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
##########
@@ -17,33 +17,62 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Map;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(List)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<CoordinatorKey, KafkaFutureImpl<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFutureImpl<Map<TopicPartition,
+        OffsetAndMetadata>>> futures) {
+        this.futures = futures;
     }
 
     /**
-     * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
-     * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
+     * Return a future which yields a map of group ids to a map of topic 
partitions to
+     * OffsetAndMetadata objects. If the group doesn't have a committed offset 
for a specific
+     * partition, the corresponding value in the returned map for that group 
id will be null.
      */
-    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {

Review comment:
       We need to keep this public method and deprecate. Perhaps throw an 
exception if multiple group ids were specified and retain existing behaviour 
for single group id.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -888,22 +888,26 @@ default ListConsumerGroupsResult listConsumerGroups() {
     }
 
     /**
-     * List the consumer group offsets available in the cluster.
-     *
+     * List the consumer group offsets available in the cluster for the given 
list of consumer
+     * groups.
+     * @param groupIds List of consumer group ids to list offsets for.
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> 
groupIds, ListConsumerGroupOffsetsOptions options);

Review comment:
       +1

##########
File path: clients/src/main/resources/common/message/OffsetFetchRequest.json
##########
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a 
time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
       "about": "The group to fetch offsets for." },
-    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", 
"nullableVersions": "2+",
+    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": 
"0-7", "nullableVersions": "2-7",
       "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
         "about": "The topic name."},
-      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
         "about": "The partition indexes we would like to fetch offsets for." }
     ]},
+    { "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": 
"8+",
+      "about": "Each group we would like to fetch offsets for", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
+        "about": "The group ID."},
+      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": 
"8+", "nullableVersions": "8+",
+        "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",

Review comment:
       Can't be 0+ for a field added in 8+?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to