cadonna commented on code in PR #15253:
URL: https://github.com/apache/kafka/pull/15253#discussion_r1472830748
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -89,18 +97,42 @@ public AdminApiLookupStrategy<CoordinatorKey>
lookupStrategy() {
}
@Override
- public DescribeGroupsRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
- List<String> groupIds = keys.stream().map(key -> {
+ public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
+ Set<CoordinatorKey> newConsumerGroups = new HashSet<>();
+ Set<CoordinatorKey> oldConsumerGroups = new HashSet<>();
+
+ keys.forEach(key -> {
if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
- throw new IllegalArgumentException("Invalid transaction
coordinator key " + key +
+ throw new IllegalArgumentException("Invalid group coordinator
key " + key +
" when building `DescribeGroups` request");
}
- return key.idValue;
- }).collect(Collectors.toList());
- DescribeGroupsRequestData data = new DescribeGroupsRequestData()
- .setGroups(groupIds)
- .setIncludeAuthorizedOperations(includeAuthorizedOperations);
- return new DescribeGroupsRequest.Builder(data);
+
+ // Be default, we always try with using the new consumer group
Review Comment:
typo:
```suggestion
// By default, we always try using the new consumer group
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -51,11 +57,12 @@
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
-public class DescribeConsumerGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, ConsumerGroupDescription> {
+public class DescribeConsumerGroupsHandler implements
AdminApiHandler<CoordinatorKey, ConsumerGroupDescription> {
private final boolean includeAuthorizedOperations;
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+ private final Map<String, Boolean> useClassicGroupApi;
Review Comment:
I am wondering whether it would be enough to use a set here instead of a
map. As far as I can see you always put the value `true` into the map. So
either the key exists with value `true` or it does not exist. The case that a
key exists with value `false` does not seem to occur with this code.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey,
ConsumerGroupDescription> handleResponse(
return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
+ private Set<TopicPartition>
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
private void handleError(
CoordinatorKey groupId,
Errors error,
+ String errorMsg,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ Set<CoordinatorKey> groupsToUnmap,
+ boolean isConsumerGroupResponse
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.debug("`DescribeGroups` request for group id {} failed due
to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.debug("`DescribeGroups` request for group id {} failed due
to error {}.", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`DescribeGroups` request for group id {} failed
because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ "is still in the process of loading state. Will retry.",
groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned
error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ "Will attempt to find the coordinator again and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
+ case UNSUPPORTED_VERSION:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the API is not " +
+ "supported. Will retry with old API.",
groupId.idValue);
+ useClassicGroupApi.put(groupId.idValue, true);
+ } else {
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ break;
+
+ case GROUP_ID_NOT_FOUND:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the group is not " +
+ "a new consumer group. Will retry with old API.",
groupId.idValue);
Review Comment:
Maybe instead of "old API" you could write "DescribeGroups". It makes it a
bit more explicit.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey,
ConsumerGroupDescription> handleResponse(
return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
+ private Set<TopicPartition>
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
private void handleError(
CoordinatorKey groupId,
Errors error,
+ String errorMsg,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ Set<CoordinatorKey> groupsToUnmap,
+ boolean isConsumerGroupResponse
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.debug("`DescribeGroups` request for group id {} failed due
to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.debug("`DescribeGroups` request for group id {} failed due
to error {}.", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`DescribeGroups` request for group id {} failed
because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ "is still in the process of loading state. Will retry.",
groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned
error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ "Will attempt to find the coordinator again and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
+ case UNSUPPORTED_VERSION:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the API is not " +
+ "supported. Will retry with old API.",
groupId.idValue);
+ useClassicGroupApi.put(groupId.idValue, true);
+ } else {
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ break;
+
+ case GROUP_ID_NOT_FOUND:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the group is not " +
Review Comment:
Shouldn't this be
```suggestion
log.debug("`ConsumerGroupDescribe` request for group id
{} failed because the group is not " +
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey,
ConsumerGroupDescription> handleResponse(
return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
+ private Set<TopicPartition>
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
private void handleError(
CoordinatorKey groupId,
Errors error,
+ String errorMsg,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ Set<CoordinatorKey> groupsToUnmap,
+ boolean isConsumerGroupResponse
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.debug("`DescribeGroups` request for group id {} failed due
to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.debug("`DescribeGroups` request for group id {} failed due
to error {}.", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`DescribeGroups` request for group id {} failed
because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ "is still in the process of loading state. Will retry.",
groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned
error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ "Will attempt to find the coordinator again and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
+ case UNSUPPORTED_VERSION:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the API is not " +
+ "supported. Will retry with old API.",
groupId.idValue);
+ useClassicGroupApi.put(groupId.idValue, true);
+ } else {
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ break;
+
+ case GROUP_ID_NOT_FOUND:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the group is not " +
+ "a new consumer group. Will retry with old API.",
groupId.idValue);
+ useClassicGroupApi.put(groupId.idValue, true);
+ } else {
+ log.error("`DescribeGroups` request for group id {}
because the group does not exist.", groupId.idValue);
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ break;
+
default:
- log.error("`DescribeGroups` request for group id {} failed due
to unexpected error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.error("`DescribeGroups` request for group id {} failed due
to unexpected error {}.", groupId.idValue, error);
Review Comment:
Here you could differentiate between old and new.
##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java:
##########
@@ -20,63 +20,127 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
public class DescribeConsumerGroupsHandlerTest {
private final LogContext logContext = new LogContext();
private final String groupId1 = "group-id1";
private final String groupId2 = "group-id2";
- private final Set<String> groupIds = new HashSet<>(Arrays.asList(groupId1,
groupId2));
- private final Set<CoordinatorKey> keys = groupIds.stream()
- .map(CoordinatorKey::byGroupId)
- .collect(Collectors.toSet());
+ private final Set<String> groupIds = new LinkedHashSet<>(Arrays.asList(
+ groupId1,
+ groupId2
+ ));
+ private final Set<CoordinatorKey> keys = new LinkedHashSet<>(Arrays.asList(
+ CoordinatorKey.byGroupId(groupId1),
+ CoordinatorKey.byGroupId(groupId2)
+ ));
private final Node coordinator = new Node(1, "host", 1234);
private final Set<TopicPartition> tps = new HashSet<>(Arrays.asList(
- new TopicPartition("foo", 0), new TopicPartition("bar", 1)));
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 1)
+ ));
- @Test
- public void testBuildRequest() {
- DescribeConsumerGroupsHandler handler = new
DescribeConsumerGroupsHandler(false, logContext);
- DescribeGroupsRequest request = handler.buildBatchedRequest(1,
keys).build();
- assertEquals(2, request.data().groups().size());
- assertFalse(request.data().includeAuthorizedOperations());
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testBuildRequest(boolean includeAuthorizedOperations) {
Review Comment:
A more meaningful name would make the test a bit clearer.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey,
ConsumerGroupDescription> handleResponse(
return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
+ private Set<TopicPartition>
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
private void handleError(
CoordinatorKey groupId,
Errors error,
+ String errorMsg,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ Set<CoordinatorKey> groupsToUnmap,
+ boolean isConsumerGroupResponse
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.debug("`DescribeGroups` request for group id {} failed due
to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.debug("`DescribeGroups` request for group id {} failed due
to error {}.", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`DescribeGroups` request for group id {} failed
because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ "is still in the process of loading state. Will retry.",
groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned
error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ "Will attempt to find the coordinator again and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
+ case UNSUPPORTED_VERSION:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the API is not " +
Review Comment:
Shouldn't this be:
```suggestion
log.debug("`ConsumerGroupDescribe` request for group id
{} failed because the API is not " +
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey,
ConsumerGroupDescription> handleResponse(
return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
+ private Set<TopicPartition>
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
private void handleError(
CoordinatorKey groupId,
Errors error,
+ String errorMsg,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ Set<CoordinatorKey> groupsToUnmap,
+ boolean isConsumerGroupResponse
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.debug("`DescribeGroups` request for group id {} failed due
to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
+ log.debug("`DescribeGroups` request for group id {} failed due
to error {}.", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`DescribeGroups` request for group id {} failed
because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ "is still in the process of loading state. Will retry.",
groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned
error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ "Will attempt to find the coordinator again and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
+ case UNSUPPORTED_VERSION:
+ if (isConsumerGroupResponse) {
+ log.debug("`DescribeGroups` request for group id {} failed
because the API is not " +
+ "supported. Will retry with old API.",
groupId.idValue);
Review Comment:
Maybe instead of "old API" you could write "DescribeGroups". It makes it a
bit more explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]