lianetm commented on code in PR #19642:
URL: https://github.com/apache/kafka/pull/19642#discussion_r2073862478


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java:
##########
@@ -139,40 +139,52 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> handleR
     ) {
         validateKeys(groupIds);
 
-        final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
+        var response = (OffsetFetchResponse) abstractResponse;
+        var completed = new HashMap<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>>();
+        var failed = new HashMap<CoordinatorKey, Throwable>();
+        var unmapped = new ArrayList<CoordinatorKey>();
 
-        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed 
= new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
         for (CoordinatorKey coordinatorKey : groupIds) {
-            String group = coordinatorKey.idValue;
-            if (response.groupHasError(group)) {
-                handleGroupError(CoordinatorKey.byGroupId(group), 
response.groupLevelError(group), failed, unmapped);
+            var groupId = coordinatorKey.idValue;
+            var group = response.group(groupId);
+            var error = Errors.forCode(group.errorCode());
+
+            if (error != Errors.NONE) {
+                handleGroupError(
+                    coordinatorKey,
+                    error,
+                    failed,
+                    unmapped
+                );
             } else {
-                final Map<TopicPartition, OffsetAndMetadata> 
groupOffsetsListing = new HashMap<>();
-                Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = response.partitionDataMap(group);
-                for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
-                    final TopicPartition topicPartition = 
partitionEntry.getKey();
-                    OffsetFetchResponse.PartitionData partitionData = 
partitionEntry.getValue();
-                    final Errors error = partitionData.error;
-
-                    if (error == Errors.NONE) {
-                        final long offset = partitionData.offset;
-                        final String metadata = partitionData.metadata;
-                        final Optional<Integer> leaderEpoch = 
partitionData.leaderEpoch;
-                        // Negative offset indicates that the group has no 
committed offset for this partition
-                        if (offset < 0) {
-                            groupOffsetsListing.put(topicPartition, null);
+                var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+
+                group.topics().forEach(topic -> {

Review Comment:
   nit: I expect we don't need this {



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -261,20 +114,53 @@ public OffsetFetchResponse(List<OffsetFetchResponseGroup> 
groups, short version)
     public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
         super(ApiKeys.OFFSET_FETCH);
         this.data = data;
-        // for version 2 and later use the top-level error code (in 
ERROR_CODE_KEY_NAME) from the response.
-        // for older versions there is no top-level error in the response and 
all errors are partition errors,
-        // so if there is a group or coordinator error at the partition level 
use that as the top-level error.
-        // this way clients can depend on the top-level error regardless of 
the offset fetch version.
-        // we return the error differently starting with version 8, so we will 
only populate the
-        // error field if we are between version 2 and 7. if we are in version 
8 or greater, then
-        // we will populate the map of group id to error codes.
+        this.version = version;
+    }
+
+    private Map<String, OffsetFetchResponseData.OffsetFetchResponseGroup> 
groups = null;

Review Comment:
   very nice building block here. Maybe worth a comment about how this 
structure now keeps normalized data per group (no matter the request version 
with/without data and errors per group)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to