[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535308#comment-16535308 ] Yishun Guan commented on KAFKA-6788: Hi, I will work on this issue from scratch then? > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530469#comment-16530469 ] Yishun Guan commented on KAFKA-6788: Does it have a retry mechanism now? I am looking at KAKFA-6789 (https://issues.apache.org/jira/browse/KAFKA-6789) where it talks about a possible implementation of a retry mechanism. > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530449#comment-16530449 ] Colin P. McCabe commented on KAFKA-6788: This is an optimization which we'd like to do at some point, and which hasn't been done yet. Basically, the optimization is that if you have a bunch of groups in the batch request which all share a common group coordinator, we'd like to send one RPC to that group coordinator rather than several. I think this would be a hard optimization to do correctly because of the error handling issues. If you get an error for some, but not all, elements of the batch, you want to retry just those elements. The current closed PR looks like it got some wires crossed. It seems to be replacing DescribeGroups with ListGroups, which isn't what we want here. Listing all the groups is not very efficient. > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530220#comment-16530220 ] Guozhang Wang commented on KAFKA-6788: -- [~cmccabe] Would like to have you chime into this one, what's the current status of this request and how would we like to fix it? > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530064#comment-16530064 ] Cyrus Vafadari commented on KAFKA-6788: --- [~guozhang] I abandoned it a while ago when it didn't get attention, so haven't actively worked on it in a while. I think if the code is moving towards using futures/promise it might be better to start from scratch. If you like this strategy though I'll fix merge conflicts and reopen the PR > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528351#comment-16528351 ] Guozhang Wang commented on KAFKA-6788: -- I saw that [~cvafadari] already have a PR on it but it was closed. Cyrus are you still working on this JIRA? > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526483#comment-16526483 ] Shun Guan commented on KAFKA-6788: -- Is this still needs to be work on? Thanks. > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524049#comment-16524049 ] Shun Guan commented on KAFKA-6788: -- Is this story closed? I see there are code commits and pr but doesn't seem like anyone accepts the pr [~guozhang] > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508215#comment-16508215 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv closed pull request #4878: KAFKA-6788: Combine queries for describe and delete groups in AdminCl… URL: https://github.com/apache/kafka/pull/4878 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa3f943555b..cd79453d29a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection describedGroupIds = new HashSet<>(); + for (final Map.Entry> entry : futures.entrySet()) { // skip sending request for those futures that already failed. if (entry.getValue().isCompletedExceptionally()) continue; final String groupId = entry.getKey(); +if (describedGroupIds.contains(groupId)) { +continue; +} final long startFindCoordinatorMs = time.milliseconds(); final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); @@ -2274,53 +2277,82 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection future = futures.get(groupId); -final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); +final Set groupIdsToDescribe = new HashSet<>(); +for (ListGroupsResponse.Group group : listResponse.groups()) { +groupIdsToDescribe.add(group.groupId()); +} -final Errors groupError = groupMetadata.error(); -if (groupError != Errors.NONE) { -// TODO: KAFKA-6789, we can retry based on the error code - future.completeExceptionally(groupError.exception()); -} else { -final String protocolType = groupMetadata.protocolType(); -if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { -final List members = groupMetadata.members(); -final List consumers = new ArrayList<>(members.size()); - -for (DescribeGroupsResponse.GroupMember groupMember : members) { -final PartitionAssignor.Assignment assignment = - ConsumerProtocol.deserializeAssignment( - ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment(; - -final MemberDescription memberDescription = -new MemberDescription( -groupMember.memberId(), -groupMember.clientId(), - groupMember.clientHost(), -new MemberAssignment(assignment.partitions())); -consumers.add(memberDescription); +runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + +@Override +AbstractRequest.Builder createRequest(int timeoutMs) { +return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; +for (String describedCandidate : groupIdsToDescribe) { +if (response.groups().containsKey(describedCandidate)) { + describedGroupIds.add(describedCandidate); +} +
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466929#comment-16466929 ] Liju commented on KAFKA-6788: - Is this still open to be worked on ? if no one else is working on this I can take this up > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439050#comment-16439050 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv opened a new pull request #4878: KAFKA-6788: Combine queries for describe and delete groups in AdminCl… URL: https://github.com/apache/kafka/pull/4878 1 Ask coordinator to list its groups 2 Try to delete/describe them all 3 Track which entries are successfully deleted or described 4 Continue over succesfully deleted/described entries This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439027#comment-16439027 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv closed pull request #4875: KAFKA-6788: Grouping consumer requests per consumer coordinator in admin client URL: https://github.com/apache/kafka/pull/4875 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa3f943555b..8fd92572586 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection describedGroupIds = new HashSet<>(); + for (final Map.Entryentry : futures.entrySet()) { // skip sending request for those futures that already failed. if (entry.getValue().isCompletedExceptionally()) continue; final String groupId = entry.getKey(); +if (describedGroupIds.contains(groupId)) { +continue; +} final long startFindCoordinatorMs = time.milliseconds(); final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); @@ -2274,61 +2277,83 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection future = futures.get(groupId); -final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); +final Set groupIdsToDescribe = new HashSet<>(); +for (ListGroupsResponse.Group group : listResponse.groups()) { +groupIdsToDescribe.add(group.groupId()); +} -final Errors groupError = groupMetadata.error(); -if (groupError != Errors.NONE) { -// TODO: KAFKA-6789, we can retry based on the error code - future.completeExceptionally(groupError.exception()); -} else { -final String protocolType = groupMetadata.protocolType(); -if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { -final List members = groupMetadata.members(); -final List consumers = new ArrayList<>(members.size()); - -for (DescribeGroupsResponse.GroupMember groupMember : members) { -final PartitionAssignor.Assignment assignment = - ConsumerProtocol.deserializeAssignment( - ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment(; - -final MemberDescription memberDescription = -new MemberDescription( -groupMember.memberId(), -groupMember.clientId(), - groupMember.clientHost(), -new MemberAssignment(assignment.partitions())); -consumers.add(memberDescription); +runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + +@Override +AbstractRequest.Builder createRequest(int timeoutMs) { +return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; +for (String describedCandidate : groupIdsToDescribe) { +if (response.groups().containsKey(describedCandidate)) { + describedGroupIds.add(describedCandidate); +
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438957#comment-16438957 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv opened a new pull request #4875: KAFKA-6788: Grouping consumer requests per consumer coordinator in admin client URL: https://github.com/apache/kafka/pull/4875 Resolves KAFKA-6788 for the deleteGroup case For the "describeGroup" case, we build a hash-set and `continue` when we have already described the group. The describe API lets us submit all groups so this is the only work we must do. For the "deleteGroup" case, we ask the Coordinator which groups it knows about, and delete those. Also use a hashset to `continue` over groups we have already deleted. Passes tests, no new testing or documentation necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)