This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 66000787c1 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) 66000787c1 is described below commit 66000787c1146c6d08d88b9c564f5a000608f013 Author: Sanjana Kaundinya <skaundi...@gmail.com> AuthorDate: Thu Jul 14 05:47:34 2022 -0700 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group. Co-authored-by: Rajini Sivaram <rajinisiva...@googlemail.com> Co-authored-by: David Jacot <dja...@confluent.io> Reviewers: David Jacot <dja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com> --- .../java/org/apache/kafka/clients/admin/Admin.java | 36 ++- .../kafka/clients/admin/KafkaAdminClient.java | 11 +- .../admin/ListConsumerGroupOffsetsOptions.java | 14 +- .../admin/ListConsumerGroupOffsetsResult.java | 56 +++- .../admin/ListConsumerGroupOffsetsSpec.java | 79 ++++++ .../clients/admin/internals/AdminApiDriver.java | 3 +- .../admin/internals/CoordinatorStrategy.java | 4 + .../internals/ListConsumerGroupOffsetsHandler.java | 128 +++++---- .../kafka/common/requests/OffsetFetchResponse.java | 10 +- .../kafka/clients/admin/AdminClientTestUtils.java | 12 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 206 ++++++++++++-- .../kafka/clients/admin/MockAdminClient.java | 16 +- .../ListConsumerGroupOffsetsHandlerTest.java | 308 +++++++++++++++++++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 6 +- .../internals/ConsumerCoordinatorTest.java | 26 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 8 +- .../kafka/admin/ConsumerGroupServiceTest.scala | 22 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../processor/internals/StoreChangelogReader.java | 12 +- .../internals/StoreChangelogReaderTest.java | 11 +- 20 files changed, 813 insertions(+), 157 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index fdacc09db8..0698d29702 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable { * @param options The options to use when listing the consumer group offsets. * @return The ListGroupOffsetsResult */ - ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options); + default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { + ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() + .requireStable(options.requireStable()); + @SuppressWarnings("deprecation") + ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() + .topicPartitions(options.topicPartitions()); + return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); + } /** * List the consumer group offsets available in the cluster with the default options. * <p> - * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options. + * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} + * to list offsets of all partitions of one group with default options. * * @return The ListGroupOffsetsResult. */ @@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); } + /** + * List the consumer group offsets available in the cluster for the specified consumer groups. + * + * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for. + * + * @param options The options to use when listing the consumer group offsets. + * @return The ListConsumerGroupOffsetsResult + */ + ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options); + + /** + * List the consumer group offsets available in the cluster for the specified groups with the default options. + * <p> + * This is a convenience method for + * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options. + * + * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for. + * @return The ListConsumerGroupOffsetsResult. + */ + default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) { + return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); + } + /** * Delete consumer groups from the cluster. * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2b2642e351..41eb27a1dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3401,13 +3401,14 @@ public class KafkaAdminClient extends AdminClient { } @Override - public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, - final ListConsumerGroupOffsetsOptions options) { + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, + ListConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future = - ListConsumerGroupOffsetsHandler.newFuture(groupId); - ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext); + ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + ListConsumerGroupOffsetsHandler handler = + new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext); invokeDriver(handler, future, options.timeoutMs); - return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); + return new ListConsumerGroupOffsetsResult(future.all()); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java index 292a47ef39..44d3a40732 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java @@ -23,23 +23,28 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.List; /** - * Options for {@link Admin#listConsumerGroupOffsets(String)}. + * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and {@link Admin#listConsumerGroupOffsets(String)}. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> { - private List<TopicPartition> topicPartitions = null; + private List<TopicPartition> topicPartitions; private boolean requireStable = false; /** * Set the topic partitions to list as part of the result. * {@code null} includes all topic partitions. + * <p> + * @deprecated Since 3.3. + * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)} + * to specify topic partitions. * * @param topicPartitions List of topic partitions to include * @return This ListGroupOffsetsOptions */ + @Deprecated public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) { this.topicPartitions = topicPartitions; return this; @@ -55,7 +60,12 @@ public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsume /** * Returns a list of topic partitions to add as part of the result. + * <p> + * @deprecated Since 3.3. + * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)} + * to specify topic partitions. */ + @Deprecated public List<TopicPartition> topicPartitions() { return topicPartitions; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java index 48f4531418..2136e33a40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java @@ -17,25 +17,32 @@ package org.apache.kafka.clients.admin; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; -import java.util.Map; - /** - * The result of the {@link Admin#listConsumerGroupOffsets(String)} call. + * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and + * {@link Admin#listConsumerGroupOffsets(String)} call. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListConsumerGroupOffsetsResult { - final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future; + final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures; - ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) { - this.future = future; + ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) { + this.futures = futures.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().idValue, Entry::getValue)); } /** @@ -43,7 +50,42 @@ public class ListConsumerGroupOffsetsResult { * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null. */ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() { - return future; + if (futures.size() != 1) { + throw new IllegalStateException("Offsets from multiple consumer groups were requested. " + + "Use partitionsToOffsetAndMetadata(groupId) instead to get future for a specific group."); + } + return futures.values().iterator().next(); } + /** + * Return a future which yields a map of topic partitions to OffsetAndMetadata objects for + * the specified group. If the group doesn't have a committed offset for a specific + * partition, the corresponding value in the returned map will be null. + */ + public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) { + if (!futures.containsKey(groupId)) + throw new IllegalArgumentException("Offsets for consumer group '" + groupId + "' were not requested."); + return futures.get(groupId); + } + + /** + * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, + * if requests for all the groups succeed. + */ + public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + nil -> { + Map<String, Map<TopicPartition, OffsetAndMetadata>> listedConsumerGroupOffsets = new HashMap<>(futures.size()); + futures.forEach((key, future) -> { + try { + listedConsumerGroupOffsets.put(key, future.get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all of the futures completed successfully. + throw new RuntimeException(e); + } + }); + return listedConsumerGroupOffsets; + }); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java new file mode 100644 index 0000000000..83858e49c8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Objects; + +/** + * Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListConsumerGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + /** + * Set the topic partitions whose offsets are to be listed for a consumer group. + * {@code null} includes all topic partitions. + * + * @param topicPartitions List of topic partitions to include + * @return This ListConsumerGroupOffsetSpec + */ + public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns the topic partitions whose offsets are to be listed for a consumer group. + * {@code null} indicates that offsets of all partitions of the group are to be listed. + */ + public Collection<TopicPartition> topicPartitions() { + return topicPartitions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ListConsumerGroupOffsetsSpec)) { + return false; + } + ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o; + return Objects.equals(topicPartitions, that.topicPartitions); + } + + @Override + public int hashCode() { + return Objects.hash(topicPartitions); + } + + @Override + public String toString() { + return "ListConsumerGroupOffsetsSpec(" + + "topicPartitions=" + topicPartitions + + ')'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java index d00db4b18c..0e1b03d964 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; +import org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -253,7 +254,7 @@ public class AdminApiDriver<K, V> { .collect(Collectors.toSet()); retryLookup(keysToUnmap); - } else if (t instanceof NoBatchedFindCoordinatorsException) { + } else if (t instanceof NoBatchedFindCoordinatorsException || t instanceof NoBatchedOffsetFetchRequestException) { ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch(); Set<K> keysToUnmap = spec.keys.stream() .filter(future.lookupKeys()::contains) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java index e6fc0d624a..02b68527c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java @@ -120,6 +120,10 @@ public class CoordinatorStrategy implements AdminApiLookupStrategy<CoordinatorKe batch = false; } + public boolean batch() { + return batch; + } + private CoordinatorKey requireSingletonAndType(Set<CoordinatorKey> keys) { if (keys.size() != 1) { throw new IllegalArgumentException("Unexpected size of key set: expected 1, but got " + keys.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 08648821f7..21c7d8d488 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -17,14 +17,16 @@ package org.apache.kafka.clients.admin.internals; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -36,39 +38,26 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; -public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> { +public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> { - private final CoordinatorKey groupId; - private final List<TopicPartition> partitions; private final boolean requireStable; + private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs; private final Logger log; - private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + private final CoordinatorStrategy lookupStrategy; public ListConsumerGroupOffsetsHandler( - String groupId, - List<TopicPartition> partitions, - LogContext logContext - ) { - this(groupId, partitions, false, logContext); - } - - public ListConsumerGroupOffsetsHandler( - String groupId, - List<TopicPartition> partitions, + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean requireStable, LogContext logContext ) { - this.groupId = CoordinatorKey.byGroupId(groupId); - this.partitions = partitions; - this.requireStable = requireStable; this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class); this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + this.groupSpecs = groupSpecs; + this.requireStable = requireStable; } - public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture( - String groupId - ) { - return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) { + return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); } @Override @@ -82,16 +71,45 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo } private void validateKeys(Set<CoordinatorKey> groupIds) { - if (!groupIds.equals(Collections.singleton(groupId))) { + Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet()); + if (!keys.containsAll(groupIds)) { throw new IllegalArgumentException("Received unexpected group ids " + groupIds + - " (expected only " + Collections.singleton(groupId) + ")"); + " (expected one of " + keys + ")"); } } + private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) { + return groupIds.stream() + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()); + } + + public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) { + // Create a map that only contains the consumer groups owned by the coordinator. + Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions = new HashMap<>(groupIds.size()); + groupIds.forEach(g -> { + ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue); + List<TopicPartition> partitions = spec.topicPartitions() != null ? new ArrayList<>(spec.topicPartitions()) : null; + coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions); + }); + + return new OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, false); + } + @Override - public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> groupIds) { + public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int brokerId, Set<CoordinatorKey> groupIds) { validateKeys(groupIds); - return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, partitions, false); + + // When the OffsetFetchRequest fails with NoBatchedOffsetFetchRequestException, we completely disable + // the batching end-to-end, including the FindCoordinatorRequest. + if (lookupStrategy.batch()) { + return Collections.singletonList(new RequestAndKeys<>(buildBatchedRequest(groupIds), groupIds)); + } else { + return groupIds.stream().map(groupId -> { + Set<CoordinatorKey> keys = Collections.singleton(groupId); + return new RequestAndKeys<>(buildBatchedRequest(keys), keys); + }).collect(Collectors.toList()); + } } @Override @@ -104,44 +122,46 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; - // the groupError will contain the group level error for v0-v8 OffsetFetchResponse - Errors groupError = response.groupLevelError(groupId.idValue); - if (groupError != Errors.NONE) { - final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); - - handleGroupError(groupId, groupError, failed, groupsToUnmap); - - return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); - } else { - final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); - - response.partitionDataMap(groupId.idValue).forEach((topicPartition, partitionData) -> { - final Errors error = partitionData.error; - if (error == Errors.NONE) { - final long offset = partitionData.offset; - final String metadata = partitionData.metadata; - final Optional<Integer> leaderEpoch = partitionData.leaderEpoch; - // Negative offset indicates that the group has no committed offset for this partition - if (offset < 0) { - groupOffsetsListing.put(topicPartition, null); + Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>(); + Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + List<CoordinatorKey> unmapped = new ArrayList<>(); + for (CoordinatorKey coordinatorKey : groupIds) { + String group = coordinatorKey.idValue; + if (response.groupHasError(group)) { + handleGroupError(CoordinatorKey.byGroupId(group), response.groupLevelError(group), failed, unmapped); + } else { + final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); + Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(group); + for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) { + final TopicPartition topicPartition = partitionEntry.getKey(); + OffsetFetchResponse.PartitionData partitionData = partitionEntry.getValue(); + final Errors error = partitionData.error; + + if (error == Errors.NONE) { + final long offset = partitionData.offset; + final String metadata = partitionData.metadata; + final Optional<Integer> leaderEpoch = partitionData.leaderEpoch; + // Negative offset indicates that the group has no committed offset for this partition + if (offset < 0) { + groupOffsetsListing.put(topicPartition, null); + } else { + groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); + } } else { - groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); + log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } - } else { - log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } - }); - - return ApiResult.completed(groupId, groupOffsetsListing); + completed.put(CoordinatorKey.byGroupId(group), groupOffsetsListing); + } } + return new ApiResult<>(completed, failed, unmapped); } private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - Set<CoordinatorKey> groupsToUnmap + List<CoordinatorKey> groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 213182ec8c..4e25984668 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -173,8 +173,8 @@ public class OffsetFetchResponse extends AbstractResponse { * @param responseData Fetched offset information grouped by topic-partition and by group */ public OffsetFetchResponse(int throttleTimeMs, - Map<String, Errors> errors, Map<String, - Map<TopicPartition, PartitionData>> responseData) { + Map<String, Errors> errors, + Map<String, Map<TopicPartition, PartitionData>> responseData) { super(ApiKeys.OFFSET_FETCH); List<OffsetFetchResponseGroup> groupList = new ArrayList<>(); for (Entry<String, Map<TopicPartition, PartitionData>> entry : responseData.entrySet()) { @@ -250,7 +250,11 @@ public class OffsetFetchResponse extends AbstractResponse { } public boolean groupHasError(String groupId) { - return groupLevelErrors.get(groupId) != Errors.NONE; + Errors error = groupLevelErrors.get(groupId); + if (error == null) { + return this.error != null && this.error != Errors.NONE; + } + return error != Errors.NONE; } public Errors error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 6f98a166b1..d8b9f427d6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.kafka.clients.HostResolver; import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig; import org.apache.kafka.clients.admin.internals.MetadataOperationContext; +import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -104,14 +105,17 @@ public class AdminClientTestUtils { .collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue())))); } - public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) { - return new ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets)); + public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets) { + Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> resultMap = offsets.entrySet().stream() + .collect(Collectors.toMap(e -> CoordinatorKey.byGroupId(e.getKey()), + e -> KafkaFutureImpl.completedFuture(e.getValue()))); + return new ListConsumerGroupOffsetsResult(resultMap); } - public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(KafkaException exception) { + public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(String group, KafkaException exception) { final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>(); future.completeExceptionally(exception); - return new ListConsumerGroupOffsetsResult(future); + return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 61a2aaa00b..3d285a45f7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -108,6 +108,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; @@ -192,6 +193,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetDeleteResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.UnregisterBrokerResponse; import org.apache.kafka.common.requests.UpdateFeaturesRequest; @@ -224,6 +226,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -266,6 +269,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class KafkaAdminClientTest { private static final Logger log = LoggerFactory.getLogger(KafkaAdminClientTest.class); private static final String GROUP_ID = "group-0"; + private static final int THROTTLE = 10; @Test public void testDefaultApiTimeoutAndRequestTimeoutConflicts() { @@ -501,6 +505,21 @@ public class KafkaAdminClientTest { return FindCoordinatorResponse.prepareOldResponse(error, node); } + private static FindCoordinatorResponse prepareBatchedFindCoordinatorResponse(Errors error, Node node, Collection<String> groups) { + FindCoordinatorResponseData data = new FindCoordinatorResponseData(); + List<FindCoordinatorResponseData.Coordinator> coordinators = groups.stream() + .map(group -> new FindCoordinatorResponseData.Coordinator() + .setErrorCode(error.code()) + .setErrorMessage(error.message()) + .setKey(group) + .setHost(node.host()) + .setPort(node.port()) + .setNodeId(node.id())) + .collect(Collectors.toList()); + data.setCoordinators(coordinators); + return new FindCoordinatorResponse(data); + } + private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { List<MetadataResponseTopic> metadata = new ArrayList<>(); for (String topic : cluster.topics()) { @@ -3067,9 +3086,11 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final TopicPartition tp1 = new TopicPartition("A", 0); - final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions(); - options.topicPartitions(Collections.singletonList(tp1)).requireStable(true); - final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID, options); + final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() + .requireStable(true); + final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() + .topicPartitions(Collections.singletonList(tp1)); + env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); final MockClient mockClient = env.kafkaClient(); TestUtils.waitForCondition(() -> { @@ -3077,11 +3098,11 @@ public class KafkaAdminClientTest { if (clientRequest != null) { OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; return data.requireStable() && - data.topics().get(0).name().equals("A") && - data.topics().get(0).partitionIndexes().equals(Collections.singletonList(0)); + data.groups().get(0).topics().get(0).name().equals("A") && + data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0)); } return false; - }, "Failed awaiting ListConsumerGroupOffsets request"); + }, "Failed awaiting ListConsumerGroupOfsets request"); } } @@ -3095,12 +3116,11 @@ public class KafkaAdminClientTest { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureError(result.partitionsToOffsetAndMetadata(), TimeoutException.class); } } @@ -3124,16 +3144,16 @@ public class KafkaAdminClientTest { mockClient.prepareResponse(body -> { firstAttemptTime.set(time.milliseconds()); return true; - }, new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + }, offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); mockClient.prepareResponse(body -> { secondAttemptTime.set(time.milliseconds()); return true; - }, new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); + }, offsetFetchResponse(Errors.NONE, Collections.emptyMap())); - final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets("group-0").partitionsToOffsetAndMetadata(); + final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets(GROUP_ID).partitionsToOffsetAndMetadata(); TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, "Failed awaiting ListConsumerGroupOffsets first request failure"); TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, "Failed to add retry ListConsumerGroupOffsets call on first failure"); @@ -3157,7 +3177,8 @@ public class KafkaAdminClientTest { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); + offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); + /* * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a @@ -3166,19 +3187,19 @@ public class KafkaAdminClientTest { * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); + offsetFetchResponse(Errors.NONE, Collections.emptyMap())); final ListConsumerGroupOffsetsResult errorResult1 = env.adminClient().listConsumerGroupOffsets(GROUP_ID); @@ -3199,8 +3220,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new OffsetFetchResponse(error, Collections.emptyMap())); + env.kafkaClient().prepareResponse(offsetFetchResponse(error, Collections.emptyMap())); ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID); @@ -3220,7 +3240,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); /* * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets @@ -3229,10 +3249,10 @@ public class KafkaAdminClientTest { * * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); @@ -3249,7 +3269,7 @@ public class KafkaAdminClientTest { Optional.empty(), "", Errors.NONE)); responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE)); - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NONE, responseData)); final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID); final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get(); @@ -3263,6 +3283,144 @@ public class KafkaAdminClientTest { } } + @Test + public void testBatchedListConsumerGroupOffsets() throws Exception { + Cluster cluster = mockCluster(1, 0); + Time time = new MockTime(); + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet())); + + ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true, Errors.NONE); + + verifyListOffsetsForMultipleGroups(groupSpecs, result); + } + } + + @Test + public void testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() throws Exception { + Cluster cluster = mockCluster(1, 0); + Time time = new MockTime(); + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec(); + + ApiVersion findCoordinatorV3 = new ApiVersion() + .setApiKey(ApiKeys.FIND_COORDINATOR.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + ApiVersion offsetFetchV7 = new ApiVersion() + .setApiKey(ApiKeys.OFFSET_FETCH.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, offsetFetchV7))); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs); + + // Fail the first request in order to ensure that the group is not batched when retried. + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.COORDINATOR_LOAD_IN_PROGRESS); + + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE); + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE); + + verifyListOffsetsForMultipleGroups(groupSpecs, result); + } + } + + @Test + public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching() throws Exception { + Cluster cluster = mockCluster(1, 0); + Time time = new MockTime(); + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec(); + + ApiVersion offsetFetchV7 = new ApiVersion() + .setApiKey(ApiKeys.OFFSET_FETCH.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singleton(offsetFetchV7))); + env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet())); + // Prepare a response to force client to attempt batched request creation that throws + // NoBatchedOffsetFetchRequestException. This triggers creation of non-batched requests. + env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + + ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs); + + // The request handler attempts both FindCoordinator and OffsetFetch requests. This seems + // ok since since we expect this scenario only during upgrades from versions < 3.0.0 where + // some upgraded brokers could handle batched FindCoordinator while non-upgraded coordinators + // rejected batched OffsetFetch requests. + sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller()); + sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller()); + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE); + sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE); + + verifyListOffsetsForMultipleGroups(groupSpecs, result); + } + } + + private Map<String, ListConsumerGroupOffsetsSpec> batchedListConsumerGroupOffsetsSpec() { + Set<TopicPartition> groupAPartitions = Collections.singleton(new TopicPartition("A", 1)); + Set<TopicPartition> groupBPartitions = Collections.singleton(new TopicPartition("B", 2)); + + ListConsumerGroupOffsetsSpec groupASpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions); + ListConsumerGroupOffsetsSpec groupBSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions); + return Utils.mkMap(Utils.mkEntry("groupA", groupASpec), Utils.mkEntry("groupB", groupBSpec)); + } + + private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws Exception { + TestUtils.waitForCondition(() -> { + ClientRequest clientRequest = mockClient.requests().peek(); + return clientRequest != null && clientRequest.apiKey() == apiKeys; + }, "Failed awaiting " + apiKeys + " request"); + } + + private void sendFindCoordinatorResponse(MockClient mockClient, Node coordinator) throws Exception { + waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR); + + ClientRequest clientRequest = mockClient.requests().peek(); + FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder) clientRequest.requestBuilder()).data(); + mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE, data.key(), coordinator)); + } + + private void sendOffsetFetchResponse(MockClient mockClient, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws Exception { + waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); + + ClientRequest clientRequest = mockClient.requests().peek(); + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; + Map<String, Map<TopicPartition, PartitionData>> results = new HashMap<>(); + Map<String, Errors> errors = new HashMap<>(); + data.groups().forEach(group -> { + Map<TopicPartition, PartitionData> partitionResults = new HashMap<>(); + for (TopicPartition tp : groupSpecs.get(group.groupId()).topicPartitions()) { + partitionResults.put(tp, new PartitionData(10, Optional.empty(), "", Errors.NONE)); + } + results.put(group.groupId(), partitionResults); + errors.put(group.groupId(), error); + }); + if (!batched) { + assertEquals(1, data.groups().size()); + mockClient.respond(new OffsetFetchResponse(THROTTLE, error, results.values().iterator().next())); + } else + mockClient.respond(new OffsetFetchResponse(THROTTLE, errors, results)); + } + + private void verifyListOffsetsForMultipleGroups(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, + ListConsumerGroupOffsetsResult result) throws Exception { + assertEquals(groupSpecs.size(), result.all().get(10, TimeUnit.SECONDS).size()); + for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry : groupSpecs.entrySet()) { + assertEquals(entry.getValue().topicPartitions(), + result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet()); + } + } + @Test public void testDeleteConsumerGroupsNumRetries() throws Exception { final Cluster cluster = mockCluster(3, 0); @@ -6544,6 +6702,12 @@ public class KafkaAdminClientTest { .setLogDir(logDir)))); } + private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) { + return new OffsetFetchResponse(THROTTLE, + Collections.singletonMap(GROUP_ID, error), + Collections.singletonMap(GROUP_ID, responseData)); + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(), diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index ef858c5003..8c31c7cf69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; +import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; @@ -583,24 +584,29 @@ public class MockAdminClient extends AdminClient { } @Override - synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { - // ignoring the groupId and assume one test would only work on one group only + synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) { + // ignoring the groups and assume one test would only work on one group only + if (groupSpecs.size() != 1) + throw new UnsupportedOperationException("Not implemented yet"); + + String group = groupSpecs.keySet().iterator().next(); + Collection<TopicPartition> topicPartitions = groupSpecs.get(group).topicPartitions(); final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>(); if (listConsumerGroupOffsetsException != null) { future.completeExceptionally(listConsumerGroupOffsetsException); } else { - if (options.topicPartitions().isEmpty()) { + if (topicPartitions.isEmpty()) { future.complete(committedOffsets.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue())))); } else { future.complete(committedOffsets.entrySet().stream() - .filter(entry -> options.topicPartitions().contains(entry.getKey())) + .filter(entry -> topicPartitions.contains(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue())))); } } - return new ListConsumerGroupOffsetsResult(future); + return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index 27597ce035..95fabb3fc2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -24,52 +24,140 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.RequestAndKeys; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; public class ListConsumerGroupOffsetsHandlerTest { private final LogContext logContext = new LogContext(); - private final String groupId = "group-id"; + private final int throttleMs = 10; + private final String groupZero = "group0"; + private final String groupOne = "group1"; + private final String groupTwo = "group2"; + private final List<String> groups = Arrays.asList(groupZero, groupOne, groupTwo); private final TopicPartition t0p0 = new TopicPartition("t0", 0); private final TopicPartition t0p1 = new TopicPartition("t0", 1); private final TopicPartition t1p0 = new TopicPartition("t1", 0); private final TopicPartition t1p1 = new TopicPartition("t1", 1); - private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, t1p1); + private final TopicPartition t2p0 = new TopicPartition("t2", 0); + private final TopicPartition t2p1 = new TopicPartition("t2", 1); + private final TopicPartition t2p2 = new TopicPartition("t2", 2); + private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = Collections.singletonMap(groupZero, + new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, t1p1))); + private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap = + new HashMap<String, ListConsumerGroupOffsetsSpec>() {{ + put(groupZero, new ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0))); + put(groupOne, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1))); + put(groupTwo, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, t2p0, t2p1, t2p2))); + }}; @Test public void testBuildRequest() { - ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetFetchRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build(); - assertEquals(groupId, request.data().groups().get(0).groupId()); + ListConsumerGroupOffsetsHandler handler = + new ListConsumerGroupOffsetsHandler(singleRequestMap, false, logContext); + OffsetFetchRequest request = handler.buildBatchedRequest(coordinatorKeys(groupZero)).build(); + assertEquals(groupZero, request.data().groups().get(0).groupId()); assertEquals(2, request.data().groups().get(0).topics().size()); assertEquals(2, request.data().groups().get(0).topics().get(0).partitionIndexes().size()); assertEquals(2, request.data().groups().get(0).topics().get(1).partitionIndexes().size()); } + @Test + public void testBuildRequestWithMultipleGroups() { + Map<String, ListConsumerGroupOffsetsSpec> requestMap = new HashMap<>(this.batchedRequestMap); + String groupThree = "group3"; + requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec() + .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), new TopicPartition("t3", 1)))); + + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(requestMap, false, logContext); + OffsetFetchRequest request1 = handler.buildBatchedRequest(coordinatorKeys(groupZero, groupOne, groupTwo)).build(); + assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups(request1)); + + OffsetFetchRequest request2 = handler.buildBatchedRequest(coordinatorKeys(groupThree)).build(); + assertEquals(Utils.mkSet(groupThree), requestGroups(request2)); + + Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new HashMap<>(); + request1.groupIdsToPartitions().forEach((group, partitions) -> + builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions))); + request2.groupIdsToPartitions().forEach((group, partitions) -> + builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions))); + + assertEquals(requestMap, builtRequests); + Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = request1.groupIdsToTopics(); + + assertEquals(3, groupIdsToTopics.size()); + assertEquals(1, groupIdsToTopics.get(groupZero).size()); + assertEquals(2, groupIdsToTopics.get(groupOne).size()); + assertEquals(3, groupIdsToTopics.get(groupTwo).size()); + + assertEquals(1, groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size()); + assertEquals(1, groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size()); + assertEquals(2, groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size()); + assertEquals(1, groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size()); + assertEquals(2, groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size()); + assertEquals(3, groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size()); + + groupIdsToTopics = request2.groupIdsToTopics(); + assertEquals(1, groupIdsToTopics.size()); + assertEquals(1, groupIdsToTopics.get(groupThree).size()); + assertEquals(2, groupIdsToTopics.get(groupThree).get(0).partitionIndexes().size()); + } + + @Test + public void testBuildRequestBatchGroups() { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext); + Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo)); + assertEquals(1, requests.size()); + assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups((OffsetFetchRequest) requests.iterator().next().request.build())); + } + + @Test + public void testBuildRequestDoesNotBatchGroup() { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext); + // Disable batching. + ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch(); + Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo)); + assertEquals(3, requests.size()); + assertEquals( + Utils.mkSet(Utils.mkSet(groupZero), Utils.mkSet(groupOne), Utils.mkSet(groupTwo)), + requests.stream().map(requestAndKey -> requestGroups((OffsetFetchRequest) requestAndKey.request.build())).collect(Collectors.toSet()) + ); + } + @Test public void testSuccessfulHandleResponse() { Map<TopicPartition, OffsetAndMetadata> expected = new HashMap<>(); assertCompleted(handleWithError(Errors.NONE), expected); } - @Test public void testSuccessfulHandleResponseWithOnePartitionError() { Map<TopicPartition, OffsetAndMetadata> expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L)); @@ -80,17 +168,62 @@ public class ListConsumerGroupOffsetsHandlerTest { assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult); } + @Test + public void testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups() { + Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapZero = + Collections.singletonMap(t0p0, new OffsetAndMetadata(10L)); + Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapOne = + Collections.singletonMap(t1p1, new OffsetAndMetadata(10L)); + Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo = + Collections.singletonMap(t2p2, new OffsetAndMetadata(10L)); + Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = + new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{ + put(groupZero, offsetAndMetadataMapZero); + put(groupOne, offsetAndMetadataMapOne); + put(groupTwo, offsetAndMetadataMapTwo); + }}; + + assertCompletedForMultipleGroups( + handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult); + assertCompletedForMultipleGroups( + handleWithPartitionErrorMultipleGroups(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult); + assertCompletedForMultipleGroups( + handleWithPartitionErrorMultipleGroups(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult); + } + + @Test + public void testSuccessfulHandleResponseWithMultipleGroups() { + Map<String, Map<TopicPartition, OffsetAndMetadata>> expected = new HashMap<>(); + Map<String, Errors> errorMap = errorMap(groups, Errors.NONE); + assertCompletedForMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap), expected); + } + @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); } + @Test + public void testUnmappedHandleResponseWithMultipleGroups() { + Map<String, Errors> errorMap = new HashMap<>(); + errorMap.put(groupZero, Errors.NOT_COORDINATOR); + errorMap.put(groupOne, Errors.COORDINATOR_NOT_AVAILABLE); + errorMap.put(groupTwo, Errors.NOT_COORDINATOR); + assertUnmappedWithMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap)); + } + @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } + @Test + public void testRetriableHandleResponseWithMultipleGroups() { + Map<String, Errors> errorMap = errorMap(groups, Errors.COORDINATOR_LOAD_IN_PROGRESS); + assertRetriable(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap)); + } + @Test public void testFailedHandleResponse() { assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); @@ -98,10 +231,50 @@ public class ListConsumerGroupOffsetsHandlerTest { assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); } + @Test + public void testFailedHandleResponseWithMultipleGroups() { + Map<String, Errors> errorMap = new HashMap<>(); + errorMap.put(groupZero, Errors.GROUP_AUTHORIZATION_FAILED); + errorMap.put(groupOne, Errors.GROUP_ID_NOT_FOUND); + errorMap.put(groupTwo, Errors.INVALID_GROUP_ID); + Map<String, Class<? extends Throwable>> groupToExceptionMap = new HashMap<>(); + groupToExceptionMap.put(groupZero, GroupAuthorizationException.class); + groupToExceptionMap.put(groupOne, GroupIdNotFoundException.class); + groupToExceptionMap.put(groupTwo, InvalidGroupIdException.class); + assertFailedForMultipleGroups(groupToExceptionMap, + handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap)); + } + private OffsetFetchResponse buildResponse(Errors error) { - Map<TopicPartition, PartitionData> responseData = new HashMap<>(); - OffsetFetchResponse response = new OffsetFetchResponse(error, responseData); - return response; + return new OffsetFetchResponse( + throttleMs, + Collections.singletonMap(groupZero, error), + Collections.singletonMap(groupZero, new HashMap<>())); + } + + private OffsetFetchResponse buildResponseWithMultipleGroups( + Map<String, Errors> errorMap, + Map<String, Map<TopicPartition, PartitionData>> responseData + ) { + return new OffsetFetchResponse(throttleMs, errorMap, responseData); + } + + private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithErrorWithMultipleGroups( + Map<String, Errors> errorMap, + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs + ) { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupSpecs, false, logContext); + Map<String, Map<TopicPartition, PartitionData>> responseData = new HashMap<>(); + for (String group : errorMap.keySet()) { + responseData.put(group, new HashMap<>()); + } + OffsetFetchResponse response = buildResponseWithMultipleGroups(errorMap, responseData); + return handler.handleResponse(new Node(1, "host", 1234), + errorMap.keySet() + .stream() + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()), + response); } private OffsetFetchResponse buildResponseWithPartitionError(Errors error) { @@ -110,24 +283,68 @@ public class ListConsumerGroupOffsetsHandlerTest { responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); - OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData); - return response; + return new OffsetFetchResponse(Errors.NONE, responseData); + } + + private OffsetFetchResponse buildResponseWithPartitionErrorWithMultipleGroups(Errors error) { + Map<TopicPartition, PartitionData> responseDataZero = new HashMap<>(); + responseDataZero.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); + + Map<TopicPartition, PartitionData> responseDataOne = new HashMap<>(); + responseDataOne.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataOne.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataOne.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); + + Map<TopicPartition, PartitionData> responseDataTwo = new HashMap<>(); + responseDataTwo.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataTwo.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataTwo.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataTwo.put(t2p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataTwo.put(t2p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); + + Map<String, Map<TopicPartition, PartitionData>> responseData = + new HashMap<String, Map<TopicPartition, PartitionData>>() {{ + put(groupZero, responseDataZero); + put(groupOne, responseDataOne); + put(groupTwo, responseDataTwo); + }}; + + Map<String, Errors> errorMap = errorMap(groups, Errors.NONE); + return new OffsetFetchResponse(0, errorMap, responseData); } private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionError( Errors error ) { - ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext); + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(singleRequestMap, + false, logContext); OffsetFetchResponse response = buildResponseWithPartitionError(error); - return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + return handler.handleResponse(new Node(1, "host", 1234), + singleton(CoordinatorKey.byGroupId(groupZero)), response); + } + + private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionErrorMultipleGroups( + Errors error + ) { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler( + batchedRequestMap, false, logContext); + OffsetFetchResponse response = buildResponseWithPartitionErrorWithMultipleGroups(error); + return handler.handleResponse( + new Node(1, "host", 1234), + coordinatorKeys(groupZero, groupOne, groupTwo), + response); } private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithError( Errors error ) { - ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext); + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler( + singleRequestMap, false, logContext); OffsetFetchResponse response = buildResponse(error); - return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + return handler.handleResponse(new Node(1, "host", 1234), + singleton(CoordinatorKey.byGroupId(groupZero)), + response); } private void assertUnmapped( @@ -135,11 +352,19 @@ public class ListConsumerGroupOffsetsHandlerTest { ) { assertEquals(emptySet(), result.completedKeys.keySet()); assertEquals(emptySet(), result.failedKeys.keySet()); - assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)), result.unmappedKeys); + assertEquals(singletonList(CoordinatorKey.byGroupId(groupZero)), result.unmappedKeys); + } + + private void assertUnmappedWithMultipleGroups( + AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(coordinatorKeys(groupZero, groupOne, groupTwo), new HashSet<>(result.unmappedKeys)); } private void assertRetriable( - AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result + AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result ) { assertEquals(emptySet(), result.completedKeys.keySet()); assertEquals(emptySet(), result.failedKeys.keySet()); @@ -150,21 +375,64 @@ public class ListConsumerGroupOffsetsHandlerTest { AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result, Map<TopicPartition, OffsetAndMetadata> expected ) { - CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + CoordinatorKey key = CoordinatorKey.byGroupId(groupZero); assertEquals(emptySet(), result.failedKeys.keySet()); assertEquals(emptyList(), result.unmappedKeys); assertEquals(singleton(key), result.completedKeys.keySet()); - assertEquals(expected, result.completedKeys.get(CoordinatorKey.byGroupId(groupId))); + assertEquals(expected, result.completedKeys.get(key)); + } + + private void assertCompletedForMultipleGroups( + AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result, + Map<String, Map<TopicPartition, OffsetAndMetadata>> expected + ) { + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + for (String g : expected.keySet()) { + CoordinatorKey key = CoordinatorKey.byGroupId(g); + assertTrue(result.completedKeys.containsKey(key)); + assertEquals(expected.get(g), result.completedKeys.get(key)); + } } private void assertFailed( Class<? extends Throwable> expectedExceptionType, AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result ) { - CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + CoordinatorKey key = CoordinatorKey.byGroupId(groupZero); assertEquals(emptySet(), result.completedKeys.keySet()); assertEquals(emptyList(), result.unmappedKeys); assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertFailedForMultipleGroups( + Map<String, Class<? extends Throwable>> groupToExceptionMap, + AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + for (String g : groupToExceptionMap.keySet()) { + CoordinatorKey key = CoordinatorKey.byGroupId(g); + assertTrue(result.failedKeys.containsKey(key)); + assertTrue(groupToExceptionMap.get(g).isInstance(result.failedKeys.get(key))); + } + } + + private Set<CoordinatorKey> coordinatorKeys(String... groups) { + return Stream.of(groups) + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()); + } + + private Set<String> requestGroups(OffsetFetchRequest request) { + return request.data().groups() + .stream() + .map(OffsetFetchRequestGroup::groupId) + .collect(Collectors.toSet()); + } + + private Map<String, Errors> errorMap(Collection<String> groups, Errors error) { + return groups.stream().collect(Collectors.toMap(Function.identity(), unused -> error)); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index da3acf4983..e7f25345c6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -172,6 +172,7 @@ public class KafkaConsumerTest { // Set auto commit interval lower than heartbeat so we don't need to deal with // a concurrent heartbeat request private final int autoCommitIntervalMs = 500; + private final int throttleMs = 10; private final String groupId = "mock-group"; private final String memberId = "memberId"; @@ -2434,7 +2435,10 @@ public class KafkaConsumerTest { partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), Optional.empty(), "", error)); } - return new OffsetFetchResponse(Errors.NONE, partitionData); + return new OffsetFetchResponse( + throttleMs, + Collections.singletonMap(groupId, Errors.NONE), + Collections.singletonMap(groupId, partitionData)); } private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c65d33176f..db483c6c0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -71,6 +71,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; @@ -140,6 +141,7 @@ public abstract class ConsumerCoordinatorTest { private final long retryBackoffMs = 100; private final int autoCommitIntervalMs = 2000; private final int requestTimeoutMs = 30000; + private final int throttleMs = 10; private final MockTime time = new MockTime(); private GroupRebalanceConfig rebalanceConfig; @@ -2872,7 +2874,7 @@ public abstract class ConsumerCoordinatorTest { OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, metadata, Errors.NONE); - client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE)); @@ -2888,7 +2890,7 @@ public abstract class ConsumerCoordinatorTest { OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(-1, Optional.empty(), "", Errors.TOPIC_AUTHORIZATION_FAILED); - client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); TopicAuthorizationException exception = assertThrows(TopicAuthorizationException.class, () -> coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE))); @@ -2901,7 +2903,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); subscriptions.assignFromUser(singleton(t1p)); - client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)); @@ -2916,7 +2918,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); subscriptions.assignFromUser(singleton(t1p)); - client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED)); + client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, Collections.emptyMap())); try { coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)); fail("Expected group authorization error"); @@ -2959,7 +2961,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); subscriptions.assignFromUser(singleton(t1p)); - client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR)); + client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)); @@ -3435,7 +3437,11 @@ public abstract class ConsumerCoordinatorTest { OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, metadata, Errors.NONE); - client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + if (upperVersion < 8) { + client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + } else { + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + } if (expectThrows) { assertThrows(UnsupportedVersionException.class, () -> coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE))); @@ -3690,8 +3696,10 @@ public abstract class ConsumerCoordinatorTest { return new OffsetCommitResponse(responseData); } - private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) { - return new OffsetFetchResponse(topLevelError, Collections.emptyMap()); + private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) { + return new OffsetFetchResponse(throttleMs, + singletonMap(groupId, error), + singletonMap(groupId, responseData)); } private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) { @@ -3701,7 +3709,7 @@ public abstract class ConsumerCoordinatorTest { private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset, Optional<Integer> epoch) { OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, epoch, metadata, partitionLevelError); - return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data)); + return offsetFetchResponse(Errors.NONE, singletonMap(tp, data)); } private OffsetCommitCallback callback(final AtomicBoolean success) { diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 47c1d173b3..d5aee881c9 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import java.time.{Duration, Instant} -import java.util.Properties +import java.util.{Collections, Properties} import com.fasterxml.jackson.dataformat.csv.CsvMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import kafka.utils._ @@ -753,9 +753,9 @@ object ConsumerGroupCommand extends Logging { private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = { adminClient.listConsumerGroupOffsets( - groupId, - withTimeoutMs(new ListConsumerGroupOffsetsOptions) - ).partitionsToOffsetAndMetadata.get.asScala + Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec), + withTimeoutMs(new ListConsumerGroupOffsetsOptions()) + ).partitionsToOffsetAndMetadata(groupId).get().asScala } type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]] diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala index 76a3855a87..44b241a7ed 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala @@ -49,8 +49,8 @@ class ConsumerGroupServiceTest { when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)) - when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) - .thenReturn(listGroupOffsetsResult) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())) + .thenReturn(listGroupOffsetsResult(group)) when(admin.listOffsets(offsetsArgMatcher, any())) .thenReturn(listOffsetsResult) @@ -60,7 +60,7 @@ class ConsumerGroupServiceTest { assertEquals(topicPartitions.size, assignments.get.size) verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()) - verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()) + verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()) verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } @@ -112,8 +112,10 @@ class ConsumerGroupServiceTest { future.complete(consumerGroupDescription) when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))) - when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) - .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())) + .thenReturn( + AdminClientTestUtils.listConsumerGroupOffsetsResult( + Collections.singletonMap(group, commitedOffsets))) when(admin.listOffsets( ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), any() @@ -142,7 +144,7 @@ class ConsumerGroupServiceTest { assertEquals(expectedOffsets, returnedOffsets) verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()) - verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()) + verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()) verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), any()) verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), any()) } @@ -192,9 +194,9 @@ class ConsumerGroupServiceTest { new DescribeConsumerGroupsResult(Collections.singletonMap(group, future)) } - private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = { + private def listGroupOffsetsResult(groupId: String): ListConsumerGroupOffsetsResult = { val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava - AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets) + AdminClientTestUtils.listConsumerGroupOffsetsResult(Map(groupId -> offsets).asJava) } private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = { @@ -217,4 +219,8 @@ class ConsumerGroupServiceTest { }.toMap AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava) } + + private def listConsumerGroupOffsetsSpec: util.Map[String, ListConsumerGroupOffsetsSpec] = { + Collections.singletonMap(group, new ListConsumerGroupOffsetsSpec()) + } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 6d17e93782..82c19949e3 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -320,7 +320,7 @@ class RequestQuotaTest extends BaseRequestTest { ) ) case ApiKeys.OFFSET_FETCH => - new OffsetFetchRequest.Builder("test-group", false, List(tp).asJava, false) + new OffsetFetchRequest.Builder(Map("test-group"-> List(tp).asJava).asJava, false, false) case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest.Builder( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 02cfb0b49c..5240534ce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; @@ -695,11 +696,12 @@ public class StoreChangelogReader implements ChangelogReader { try { // those which do not have a committed offset would default to 0 - final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions(); - options.topicPartitions(new ArrayList<>(partitions)); - options.requireStable(true); - final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(groupId, options) - .partitionsToOffsetAndMetadata().get().entrySet() + final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() + .requireStable(true); + final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec() + .topicPartitions(new ArrayList<>(partitions)); + final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec)) + .partitionsToOffsetAndMetadata(groupId).get().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 1961736620..fbc8d42326 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.MockAdminClient; @@ -648,12 +649,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final AtomicBoolean functionCalled = new AtomicBoolean(false); final MockAdminClient adminClient = new MockAdminClient() { @Override - public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { + public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) { if (functionCalled.get()) { - return super.listConsumerGroupOffsets(groupId, options); + return super.listConsumerGroupOffsets(groupSpecs, options); } else { functionCalled.set(true); - return AdminClientTestUtils.listConsumerGroupOffsetsResult(new TimeoutException("KABOOM!")); + return AdminClientTestUtils.listConsumerGroupOffsetsResult(groupSpecs.keySet().iterator().next(), new TimeoutException("KABOOM!")); } } }; @@ -708,7 +709,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final MockAdminClient adminClient = new MockAdminClient() { @Override - public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { + public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) { throw kaboom; } }; @@ -790,7 +791,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final MockAdminClient adminClient = new MockAdminClient() { @Override - public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { + public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) { throw new AssertionError("Should not try to fetch committed offsets"); } };