sjhajharia commented on code in PR #22339: URL: https://github.com/apache/kafka/pull/22339#discussion_r3285942806
########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTestBase.java: ########## @@ -0,0 +1,1379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.MetadataRecoveryStrategy; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.feature.Features; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.ApiVersionsResponseData; +import org.apache.kafka.common.message.CreatePartitionsResponseData; +import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResultCollection; +import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResultCollection; +import org.apache.kafka.common.message.DescribeClusterResponseData; +import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBroker; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; +import org.apache.kafka.common.message.ListConfigResourcesResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition; +import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection; +import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; +import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.message.UnregisterBrokerResponseData; +import org.apache.kafka.common.message.WriteTxnMarkersResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsResponse; +import org.apache.kafka.common.requests.CreateTopicsRequest; +import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.DeleteTopicsRequest; +import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.apache.kafka.common.requests.DescribeClusterResponse; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.DescribeProducersResponse; +import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupResponse; +import org.apache.kafka.common.requests.ListConfigResourcesResponse; +import org.apache.kafka.common.requests.ListGroupsRequest; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.OffsetDeleteResponse; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.requests.UnregisterBrokerResponse; +import org.apache.kafka.common.requests.UpdateFeaturesRequest; +import org.apache.kafka.common.requests.UpdateFeaturesResponse; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.params.provider.Arguments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Shared scaffolding for KafkaAdminClient unit tests. The test methods themselves live in + * {@link KafkaAdminClientTest} and the per-domain subclasses (topic, consumer-group, streams-group, + * share-group, config, transaction). Helpers and constants are placed here so they're available + * to every subclass without duplication. + */ +public abstract class KafkaAdminClientTestBase { + protected static final Logger log = LoggerFactory.getLogger(KafkaAdminClientTestBase.class); + protected static final String GROUP_ID = "group-0"; + public static final Uuid REPLICA_DIRECTORY_ID = Uuid.randomUuid(); + + protected static Map<String, Object> newStrMap(String... vals) { + Map<String, Object> map = new HashMap<>(); + map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); + map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); + if (vals.length % 2 != 0) { + throw new IllegalStateException(); + } + for (int i = 0; i < vals.length; i += 2) { + map.put(vals[i], vals[i + 1]); + } + return map; + } + + protected static AdminClientConfig newConfMap(String... vals) { + return new AdminClientConfig(newStrMap(vals)); + } + + protected static Cluster mockCluster(int numNodes, int controllerIndex) { + HashMap<Integer, Node> nodes = new HashMap<>(); + for (int i = 0; i < numNodes; i++) + nodes.put(i, new Node(i, "localhost", 8121 + i)); + return new Cluster("mockClusterId", nodes.values(), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(controllerIndex)); + } + + protected static Cluster mockBootstrapCluster() { + return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses( + singletonList("localhost:8121"), ClientDnsLookup.USE_ALL_DNS_IPS)); + } + + protected static AdminClientUnitTestEnv mockClientEnv(String... configVals) { + return new AdminClientUnitTestEnv(mockCluster(3, 0), configVals); + } + + protected static AdminClientUnitTestEnv mockClientEnv(Time time, String... configVals) { + return new AdminClientUnitTestEnv(time, mockCluster(3, 0), configVals); + } + + protected static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors error) { + return new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code()) + .setTopics(new OffsetDeleteResponseTopicCollection()) + ); + } + + protected static OffsetDeleteResponse prepareOffsetDeleteResponse(String topic, int partition, Errors error) { + return new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(Errors.NONE.code()) + .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( + new OffsetDeleteResponseTopic() + .setName(topic) + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + ))) + ).collect(Collectors.toList()))) + ); + } + + protected static OffsetCommitResponse prepareOffsetCommitResponse(TopicPartition tp, Errors error) { + Map<TopicPartition, Errors> responseData = new HashMap<>(); + responseData.put(tp, error); + return new OffsetCommitResponse(0, responseData); + } + + protected static CreateTopicsResponse prepareCreateTopicsResponse(String topicName, Errors error) { + CreateTopicsResponseData data = new CreateTopicsResponseData(); + data.topics().add(new CreatableTopicResult() + .setName(topicName) + .setErrorCode(error.code())); + return new CreateTopicsResponse(data); + } + + public static CreateTopicsResponse prepareCreateTopicsResponse(int throttleTimeMs, CreatableTopicResult... topics) { + CreateTopicsResponseData data = new CreateTopicsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setTopics(new CreatableTopicResultCollection(Arrays.asList(topics))); + return new CreateTopicsResponse(data); + } + + public static CreatableTopicResult creatableTopicResult(String name, Errors error) { + return new CreatableTopicResult() + .setName(name) + .setErrorCode(error.code()); + } + + public static DeleteTopicsResponse prepareDeleteTopicsResponse(int throttleTimeMs, DeletableTopicResult... topics) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setResponses(new DeletableTopicResultCollection(Arrays.asList(topics))); + return new DeleteTopicsResponse(data); + } + + public static DeletableTopicResult deletableTopicResult(String topicName, Errors error) { + return new DeletableTopicResult() + .setName(topicName) + .setErrorCode(error.code()); + } + + public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) { + return new DeletableTopicResult() + .setTopicId(topicId) + .setErrorCode(error.code()); + } + + public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) { + CreatePartitionsResponseData data = new CreatePartitionsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setResults(asList(topics)); + return new CreatePartitionsResponse(data); + } + + public static CreatePartitionsTopicResult createPartitionsTopicResult(String name, Errors error) { + return createPartitionsTopicResult(name, error, null); + } + + public static CreatePartitionsTopicResult createPartitionsTopicResult(String name, Errors error, String errorMessage) { + return new CreatePartitionsTopicResult() + .setName(name) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + protected static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName, Errors error) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setName(topicName) + .setErrorCode(error.code())); + return new DeleteTopicsResponse(data); + } + + protected static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setTopicId(id) + .setErrorCode(error.code())); + return new DeleteTopicsResponse(data); + } + + protected static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) { + return prepareFindCoordinatorResponse(error, GROUP_ID, node); + } + + protected static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, String key, Node node) { + return FindCoordinatorResponse.prepareResponse(error, key, node); + } + + protected static FindCoordinatorResponse prepareOldFindCoordinatorResponse(Errors error, Node node) { + return FindCoordinatorResponse.prepareOldResponse(error, node); + } + + protected static FindCoordinatorResponse prepareBatchedFindCoordinatorResponse(Errors error, Node node, Collection<String> groups) { + FindCoordinatorResponseData data = new FindCoordinatorResponseData(); + List<FindCoordinatorResponseData.Coordinator> coordinators = groups.stream() + .map(group -> new FindCoordinatorResponseData.Coordinator() + .setErrorCode(error.code()) + .setErrorMessage(error.message()) + .setKey(group) + .setHost(node.host()) + .setPort(node.port()) + .setNodeId(node.id())) + .collect(Collectors.toList()); + data.setCoordinators(coordinators); + return new FindCoordinatorResponse(data); + } + + protected static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { + return prepareMetadataResponse(cluster, error, error); + } + + protected static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { + List<MetadataResponseTopic> metadata = new ArrayList<>(); + for (String topic : cluster.topics()) { + List<MetadataResponsePartition> pms = new ArrayList<>(); + for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { + MetadataResponsePartition pm = new MetadataResponsePartition() + .setErrorCode(partitionError.code()) + .setPartitionIndex(pInfo.partition()) + .setLeaderId(pInfo.leader().id()) + .setLeaderEpoch(234) + .setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList())) + .setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList())) + .setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); + pms.add(pm); + } + MetadataResponseTopic tm = new MetadataResponseTopic() + .setErrorCode(topicError.code()) + .setName(topic) + .setIsInternal(false) + .setPartitions(pms); + metadata.add(tm); + } + return MetadataResponse.prepareResponse(true, + 0, + cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + metadata, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + } + + protected static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId, + List<String> groupInstances, + List<TopicPartition> topicPartitions) { + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); + List<DescribedGroupMember> describedGroupMembers = groupInstances.stream().map(groupInstance -> DescribeGroupsResponse.groupMember(JoinGroupRequest.UNKNOWN_MEMBER_ID, + groupInstance, "clientId0", "clientHost", new byte[memberAssignment.remaining()], null)).collect(Collectors.toList()); + DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + groupId, + Errors.NONE, + "", + ConsumerProtocol.PROTOCOL_TYPE, + "", + describedGroupMembers, + Collections.emptySet())); + return data; + } + + protected static FeatureMetadata defaultFeatureMetadata() { + return new FeatureMetadata( + Map.of("test_feature_1", new FinalizedVersionRange((short) 2, (short) 2)), + Optional.of(1L), + Map.of("test_feature_1", new SupportedVersionRange((short) 1, (short) 5))); + } + + protected static Features<org.apache.kafka.common.feature.SupportedVersionRange> convertSupportedFeaturesMap(Map<String, SupportedVersionRange> features) { + final Map<String, org.apache.kafka.common.feature.SupportedVersionRange> featuresMap = new HashMap<>(); + for (final Map.Entry<String, SupportedVersionRange> entry : features.entrySet()) { + final SupportedVersionRange versionRange = entry.getValue(); + featuresMap.put( + entry.getKey(), + new org.apache.kafka.common.feature.SupportedVersionRange(versionRange.minVersion(), + versionRange.maxVersion())); + } + + return Features.supportedFeatures(featuresMap); + } + + protected static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) { + if (error == Errors.NONE) { + return new ApiVersionsResponse.Builder(). + setApiVersions(ApiVersionsResponse.filterApis( + ApiMessageType.ListenerType.BROKER, false, false)). + setSupportedFeatures( + convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())). + setFinalizedFeatures( + Collections.singletonMap("test_feature_1", (short) 2)). + setFinalizedFeaturesEpoch( + defaultFeatureMetadata().finalizedFeaturesEpoch().get()). + build(); + } + return new ApiVersionsResponse( + new ApiVersionsResponseData() + .setThrottleTimeMs(0) + .setErrorCode(error.code())); + } + + protected static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) { + return new QuorumInfo(1, 1, 1L, + singletonList(new QuorumInfo.ReplicaState(1, + emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID, + 100, + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), + singletonList(new QuorumInfo.ReplicaState(1, + emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID, + 100, + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), + singletonMap(1, new QuorumInfo.Node(1, Collections.emptyList()))); + } + + protected static DescribeQuorumResponse prepareDescribeQuorumResponse( + Errors topLevelError, + Errors partitionLevelError, + Boolean topicCountError, + Boolean topicNameError, + Boolean partitionCountError, + Boolean partitionIndexError, + Boolean emptyOptionals) { + String topicName = topicNameError ? "RANDOM" : Topic.CLUSTER_METADATA_TOPIC_NAME; + int partitionIndex = partitionIndexError ? 1 : Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition(); + List<DescribeQuorumResponseData.TopicData> topics = new ArrayList<>(); + List<DescribeQuorumResponseData.PartitionData> partitions = new ArrayList<>(); + for (int i = 0; i < (partitionCountError ? 2 : 1); i++) { + DescribeQuorumResponseData.ReplicaState replica = new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(1) + .setReplicaDirectoryId(emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID) + .setLogEndOffset(100); + replica.setLastFetchTimestamp(emptyOptionals ? -1 : 1000); + replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000); + partitions.add(new DescribeQuorumResponseData.PartitionData().setPartitionIndex(partitionIndex) + .setLeaderId(1) + .setLeaderEpoch(1) + .setHighWatermark(1) + .setCurrentVoters(singletonList(replica)) + .setObservers(singletonList(replica)) + .setErrorCode(partitionLevelError.code()) + .setErrorMessage(partitionLevelError.message())); + } + for (int i = 0; i < (topicCountError ? 2 : 1); i++) { + topics.add(new DescribeQuorumResponseData.TopicData().setTopicName(topicName).setPartitions(partitions)); + } + return new DescribeQuorumResponse(new DescribeQuorumResponseData() + .setTopics(topics) + .setErrorCode(topLevelError.code()) + .setErrorMessage(topLevelError.message()) + .setNodes(new DescribeQuorumResponseData.NodeCollection(Collections.singleton(new DescribeQuorumResponseData.Node().setNodeId(1))))); + } + + protected void verifyUnreachableBootstrapServer(MetadataRecoveryStrategy metadataRecoveryStrategy) throws Exception { + // This tests the scenario in which the bootstrap server is unreachable for a short while, + // which prevents AdminClient from being able to send the initial metadata request + + Cluster cluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 8121))); + Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L); + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, + AdminClientUnitTestEnv.clientConfigs(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG, metadataRecoveryStrategy.name), unreachableNodes)) { + Cluster discoveredCluster = mockCluster(3, 0); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, + RequestTestUtils.metadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(), + 1, Collections.emptyList())); + if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP) { + env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, + RequestTestUtils.metadataResponse(discoveredCluster.nodes(), + discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList())); + } + env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, + prepareCreateTopicsResponse("myTopic", Errors.NONE)); + + KafkaFuture<Void> future = env.adminClient().createTopics( + singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), + new CreateTopicsOptions().timeoutMs(10000)).all(); + + future.get(); + } + } + + protected MockClient.RequestMatcher expectCreateTopicsRequestWithTopics(final String... topics) { + return body -> { + if (body instanceof CreateTopicsRequest) { + CreateTopicsRequest request = (CreateTopicsRequest) body; + for (String topic : topics) { + if (request.data().topics().find(topic) == null) + return false; + } + return topics.length == request.data().topics().size(); + } + return false; + }; + } + + protected MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final String... topics) { + return body -> { + if (body instanceof DeleteTopicsRequest) { + DeleteTopicsRequest request = (DeleteTopicsRequest) body; + return request.topicNames().equals(asList(topics)); + } + return false; + }; + } + + protected MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) { + return body -> { + if (body instanceof DeleteTopicsRequest) { + DeleteTopicsRequest request = (DeleteTopicsRequest) body; + return request.topicIds().equals(asList(topicIds)); + } + return false; + }; + } + + protected void addPartitionToDescribeTopicPartitionsResponse( + DescribeTopicPartitionsResponseData data, + String topicName, + Uuid topicId, + List<Integer> partitions) { + List<DescribeTopicPartitionsResponsePartition> addingPartitions = new ArrayList<>(); + partitions.forEach(partition -> + addingPartitions.add(new DescribeTopicPartitionsResponsePartition() + .setIsrNodes(singletonList(0)) + .setErrorCode((short) 0) + .setLeaderEpoch(0) + .setLeaderId(0) + .setEligibleLeaderReplicas(singletonList(1)) + .setLastKnownElr(singletonList(2)) + .setPartitionIndex(partition) + .setReplicaNodes(asList(0, 1, 2))) + ); + data.topics().add(new DescribeTopicPartitionsResponseTopic() + .setErrorCode((short) 0) + .setTopicId(topicId) + .setName(topicName) + .setIsInternal(false) + .setPartitions(addingPartitions)); + } + + protected void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) { + ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics( + singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), + new CreateTopicsOptions().timeoutMs(10000)).all().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + Map<String, NewPartitions> counts = new HashMap<>(); + counts.put("my_topic", NewPartitions.increaseTo(3)); + counts.put("other_topic", NewPartitions.increaseTo(3, asList(singletonList(2), singletonList(3)))); + e = assertThrows(ExecutionException.class, () -> env.adminClient().createPartitions(counts).all().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + e = assertThrows(ExecutionException.class, () -> env.adminClient().createAcls(asList(ACL1, ACL2)).all().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + e = assertThrows(ExecutionException.class, () -> env.adminClient().describeAcls(FILTER1).values().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + e = assertThrows(ExecutionException.class, () -> env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + e = assertThrows(ExecutionException.class, () -> env.adminClient().describeConfigs( + singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + } + + protected void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) { + ExecutionException e = assertThrows(ExecutionException.class, + () -> env.adminClient().describeClientQuotas(ClientQuotaFilter.all()).entities().get()); + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + + ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")); + ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, singletonList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0))); + e = assertThrows(ExecutionException.class, + () -> env.adminClient().alterClientQuotas(singletonList(alteration)).all().get()); + + assertInstanceOf(AuthenticationException.class, e.getCause(), + "Expected an authentication error, but got " + Utils.stackTrace(e)); + } + + protected static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); + protected static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)); + protected static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL), + new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); + protected static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL), + new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)); + protected static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter( + new ResourcePatternFilter(ResourceType.UNKNOWN, null, PatternType.LITERAL), + new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)); + + protected static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { + return prepareDescribeLogDirsResponse(error, logDir, + prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); + } + + protected static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag, long totalBytes, long usableBytes) { + return prepareDescribeLogDirsResponse(error, logDir, + prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false), totalBytes, usableBytes, false); + } + + protected static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag, long totalBytes, long usableBytes, boolean isCordoned) { + return prepareDescribeLogDirsResponse(error, logDir, + prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false), totalBytes, usableBytes, isCordoned); + } + + protected static List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics( + long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) { + return singletonList(new DescribeLogDirsTopic() + .setName(topic) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(partition) + .setPartitionSize(partitionSize) + .setIsFutureKey(isFuture) + .setOffsetLag(offsetLag)))); + } + + protected static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List<DescribeLogDirsTopic> topics) { + return new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir) + .setTopics(topics) + ))); + } + + protected static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List<DescribeLogDirsTopic> topics, + long totalBytes, long usableBytes, + boolean isCordoned) { + return new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir) + .setTopics(topics) + .setTotalBytes(totalBytes) + .setUsableBytes(usableBytes) + .setIsCordoned(isCordoned) + ))); + } + + protected static DescribeLogDirsResponse prepareEmptyDescribeLogDirsResponse(Optional<Errors> error) { + DescribeLogDirsResponseData data = new DescribeLogDirsResponseData(); + error.ifPresent(e -> data.setErrorCode(e.code())); + return new DescribeLogDirsResponse(data); + } + + protected static void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir, + TopicPartition tp, long partitionSize, long offsetLag) { + assertDescriptionContains(descriptionsMap, logDir, tp, partitionSize, offsetLag, OptionalLong.empty(), OptionalLong.empty()); + } + + protected static void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir, + TopicPartition tp, long partitionSize, long offsetLag, OptionalLong totalBytes, OptionalLong usableBytes) { + assertNotNull(descriptionsMap); + assertEquals(singleton(logDir), descriptionsMap.keySet()); + assertNull(descriptionsMap.get(logDir).error()); + Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get(logDir).replicaInfos(); + assertEquals(singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size()); + assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); + assertEquals(totalBytes, descriptionsMap.get(logDir).totalBytes()); + assertEquals(usableBytes, descriptionsMap.get(logDir).usableBytes()); + assertFalse(descriptionsMap.get(logDir).isCordoned()); + } + + protected static DescribeLogDirsResponseData.DescribeLogDirsResult prepareDescribeLogDirsResult(TopicPartitionReplica tpr, String logDir, int partitionSize, int offsetLag, boolean isFuture) { + return new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir(logDir) + .setTopics(prepareDescribeLogDirsTopics(partitionSize, offsetLag, tpr.topic(), tpr.partition(), isFuture)); + } + + protected MockClient.RequestMatcher expectCreatePartitionsRequestWithTopics(final String... topics) { + return body -> { + if (body instanceof CreatePartitionsRequest) { + CreatePartitionsRequest request = (CreatePartitionsRequest) body; + for (String topic : topics) { + if (request.data().topics().find(topic) == null) + return false; + } + return topics.length == request.data().topics().size(); + } + return false; + }; + } + + protected static DescribeClusterResponse prepareDescribeClusterResponse( + int throttleTimeMs, + Collection<Node> brokers, + String clusterId, + int controllerId, + int clusterAuthorizedOperations, + boolean sentToController + ) { + DescribeClusterResponseData data = new DescribeClusterResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(throttleTimeMs) + .setControllerId(controllerId) + .setClusterId(clusterId) + .setClusterAuthorizedOperations(clusterAuthorizedOperations); + + if (sentToController) { + data.setEndpointType(EndpointType.CONTROLLER.id()); + } + + brokers.forEach(broker -> + data.brokers().add(new DescribeClusterBroker() + .setHost(broker.host()) + .setPort(broker.port()) + .setBrokerId(broker.id()) + .setRack(broker.rack()))); + + return new DescribeClusterResponse(data); + } + + protected MockClient.RequestMatcher expectListGroupsRequestWithFilters( + Set<String> expectedStates, + Set<String> expectedTypes + ) { + return body -> { + if (body instanceof ListGroupsRequest) { + ListGroupsRequest request = (ListGroupsRequest) body; + return Objects.equals(new HashSet<>(request.data().statesFilter()), expectedStates) + && Objects.equals(new HashSet<>(request.data().typesFilter()), expectedTypes); + } + return false; + }; + } + + protected void verifyListConsumerGroupOffsetsOptions() throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0)); + final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() + .requireStable(true) + .timeoutMs(300); + + final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() + .topicPartitions(partitions); + env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); + + final MockClient mockClient = env.kafkaClient(); + waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); + + ClientRequest clientRequest = mockClient.requests().peek(); + assertNotNull(clientRequest); + assertEquals(300, clientRequest.requestTimeoutMs()); + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); + assertTrue(data.requireStable()); + assertEquals(Collections.singletonList(GROUP_ID), + data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList())); + assertEquals(Collections.singletonList("A"), + data.groups().get(0).topics().stream().map(OffsetFetchRequestTopics::name).collect(Collectors.toList())); + assertEquals(Collections.singletonList(0), + data.groups().get(0).topics().get(0).partitionIndexes()); + } + } + + protected Map<String, ListConsumerGroupOffsetsSpec> batchedListConsumerGroupOffsetsSpec() { + Set<TopicPartition> groupAPartitions = Collections.singleton(new TopicPartition("A", 1)); + Set<TopicPartition> groupBPartitions = Collections.singleton(new TopicPartition("B", 2)); + + ListConsumerGroupOffsetsSpec groupASpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions); + ListConsumerGroupOffsetsSpec groupBSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions); + return Map.of("groupA", groupASpec, "groupB", groupBSpec); + } + + protected Map<String, ListStreamsGroupOffsetsSpec> batchedListStreamsGroupOffsetsSpec() { + Set<TopicPartition> groupAPartitions = Collections.singleton(new TopicPartition("A", 1)); + Set<TopicPartition> groupBPartitions = Collections.singleton(new TopicPartition("B", 2)); + + ListStreamsGroupOffsetsSpec groupASpec = new ListStreamsGroupOffsetsSpec().topicPartitions(groupAPartitions); + ListStreamsGroupOffsetsSpec groupBSpec = new ListStreamsGroupOffsetsSpec().topicPartitions(groupBPartitions); + return Map.of("groupA", groupASpec, "groupB", groupBSpec); + } + + protected void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws Exception { + TestUtils.waitForCondition(() -> { + ClientRequest clientRequest = mockClient.requests().peek(); + return clientRequest != null && clientRequest.apiKey() == apiKeys; + }, "Failed awaiting " + apiKeys + " request"); + } + + protected void sendFindCoordinatorResponse(MockClient mockClient, Node coordinator) throws Exception { + waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR); + + ClientRequest clientRequest = mockClient.requests().peek(); + FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder) clientRequest.requestBuilder()).data(); + mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE, data.key(), coordinator)); + } + + protected void sendOffsetFetchResponse(MockClient mockClient, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws Exception { + waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); + + ClientRequest clientRequest = mockClient.requests().peek(); + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); + + if (!batched) { + assertEquals(1, data.groups().size()); + } + + OffsetFetchResponseData response = new OffsetFetchResponseData() + .setGroups(data.groups().stream().map(group -> + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(group.groupId()) + .setErrorCode(error.code()) + .setTopics(groupSpecs.get(group.groupId()).topicPartitions().stream() + .collect(Collectors.groupingBy(TopicPartition::topic)).entrySet().stream().map(entry -> + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(entry.getKey()) + .setPartitions(entry.getValue().stream().map(partition -> + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(partition.partition()) + .setCommittedOffset(10) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())); + + mockClient.respond(new OffsetFetchResponse(response, ApiKeys.OFFSET_FETCH.latestVersion())); + } + + protected void sendStreamsOffsetFetchResponse(MockClient mockClient, Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws Exception { + waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); + + ClientRequest clientRequest = mockClient.requests().peek(); + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); + + if (!batched) { + assertEquals(1, data.groups().size()); + } + + OffsetFetchResponseData response = new OffsetFetchResponseData() + .setGroups(data.groups().stream().map(group -> + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(group.groupId()) + .setErrorCode(error.code()) + .setTopics(groupSpecs.get(group.groupId()).topicPartitions().stream() + .collect(Collectors.groupingBy(TopicPartition::topic)).entrySet().stream().map(entry -> + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(entry.getKey()) + .setPartitions(entry.getValue().stream().map(partition -> + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(partition.partition()) + .setCommittedOffset(10) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())); + + mockClient.respond(new OffsetFetchResponse(response, ApiKeys.OFFSET_FETCH.latestVersion())); + } + + protected void verifyListOffsetsForMultipleGroups(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, + ListConsumerGroupOffsetsResult result) throws Exception { + assertEquals(groupSpecs.size(), result.all().get(10, TimeUnit.SECONDS).size()); + for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry : groupSpecs.entrySet()) { + assertEquals(entry.getValue().topicPartitions(), + result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet()); + } + } + + protected void verifyListStreamsOffsetsForMultipleGroups(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, + ListStreamsGroupOffsetsResult result) throws Exception { + assertEquals(groupSpecs.size(), result.all().get(10, TimeUnit.SECONDS).size()); + for (Map.Entry<String, ListStreamsGroupOffsetsSpec> entry : groupSpecs.entrySet()) { + assertEquals(entry.getValue().topicPartitions(), + result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet()); + } + } + + protected void testRemoveMembersFromGroup(String reason, String expectedReason) throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(body -> { + if (!(body instanceof LeaveGroupRequest)) { + return false; + } + LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); + + return leaveGroupRequest.members().stream().allMatch( + member -> member.reason().equals(expectedReason) + ); + }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( + asList( + new MemberResponse().setGroupInstanceId("instance-1"), + new MemberResponse().setGroupInstanceId("instance-2") + )) + )); + + MemberToRemove memberToRemove1 = new MemberToRemove("instance-1"); + MemberToRemove memberToRemove2 = new MemberToRemove("instance-2"); + + RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(asList( + memberToRemove1, + memberToRemove2 + )); + options.reason(reason); + + final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( + GROUP_ID, + options + ); + + assertNull(result.all().get()); + assertNull(result.memberResult(memberToRemove1).get()); + assertNull(result.memberResult(memberToRemove2).get()); + } + } + + protected Map<String, FeatureUpdate> makeTestFeatureUpdates() { + return Map.of( + "test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE), + "test_feature_2", new FeatureUpdate((short) 3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)); + } + + protected void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates, + ApiError topLevelError, + Set<String> updates) throws Exception { + try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().prepareResponse( + body -> body instanceof UpdateFeaturesRequest, + UpdateFeaturesResponse.createWithErrors(topLevelError, updates, 0)); + final Map<String, KafkaFuture<Void>> futures = env.adminClient().updateFeatures( + featureUpdates, + new UpdateFeaturesOptions().timeoutMs(10000)).values(); + for (final Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) { + final KafkaFuture<Void> future = entry.getValue(); + if (topLevelError.error() == Errors.NONE) { + // Since the top level error was NONE, each future should be successful. + future.get(); + } else { + final ExecutionException e = assertThrows(ExecutionException.class, future::get); + assertEquals(e.getCause().getClass(), topLevelError.exception().getClass()); + assertEquals(e.getCause().getMessage(), topLevelError.exception().getMessage()); + } + } + } + } + + protected static Stream<Arguments> listOffsetsMetadataNonRetriableErrors() { + return Stream.of( + Arguments.of( + Errors.TOPIC_AUTHORIZATION_FAILED, + Errors.TOPIC_AUTHORIZATION_FAILED, + TopicAuthorizationException.class + ), + Arguments.of( + // We fail fast when the entire topic is unknown... + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.NONE, + UnknownTopicOrPartitionException.class + ), + Arguments.of( + // ... even if a partition in the topic is also somehow reported as unknown... + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.UNKNOWN_TOPIC_OR_PARTITION, + UnknownTopicOrPartitionException.class + ), + Arguments.of( + // ... or a partition in the topic has a different, otherwise-retriable error + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.LEADER_NOT_AVAILABLE, + UnknownTopicOrPartitionException.class + ) + ); + } + + protected void testApiTimeout(int requestTimeoutMs, + int defaultApiTimeoutMs, + OptionalInt overrideApiTimeoutMs) throws Exception { + HashMap<Integer, Node> nodes = new HashMap<>(); + MockTime time = new MockTime(); + Node node0 = new Node(0, "localhost", 8121); + nodes.put(0, node0); + Cluster cluster = new Cluster("mockClusterId", nodes.values(), + singletonList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final int retryBackoffMs = 100; + final int effectiveTimeoutMs = overrideApiTimeoutMs.orElse(defaultApiTimeoutMs); + assertEquals(2 * requestTimeoutMs, effectiveTimeoutMs, + "This test expects the effective timeout to be twice the request timeout"); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs), + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs), + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(defaultApiTimeoutMs))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + ListTopicsOptions options = new ListTopicsOptions(); + overrideApiTimeoutMs.ifPresent(options::timeoutMs); + + final ListTopicsResult result = env.adminClient().listTopics(options); + + // Wait until the first attempt has been sent, then advance the time + TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(), + "Timed out waiting for Metadata request to be sent"); + time.sleep(requestTimeoutMs + 1); + + // Wait for the request to be timed out before backing off + TestUtils.waitForCondition(() -> !env.kafkaClient().hasInFlightRequests(), + "Timed out waiting for inFlightRequests to be timed out"); + + // Since api timeout bound is not hit, AdminClient should retry + TestUtils.waitForCondition(() -> { + boolean hasInflightRequests = env.kafkaClient().hasInFlightRequests(); + if (!hasInflightRequests) + time.sleep(retryBackoffMs); + return hasInflightRequests; + }, "Timed out waiting for Metadata request to be sent"); + time.sleep(requestTimeoutMs + 1); + + TestUtils.assertFutureThrows(TimeoutException.class, result.future); + } + } + + protected ClientQuotaEntity newClientQuotaEntity(String... args) { + assertEquals(0, args.length % 2); + + Map<String, String> entityMap = new HashMap<>(args.length / 2); + for (int index = 0; index < args.length; index += 2) { + entityMap.put(args[index], args[index + 1]); + } + return new ClientQuotaEntity(entityMap); + } + + protected void createAlterLogDirsResponse(AdminClientUnitTestEnv env, Node node, Errors error, int... partitions) { + env.kafkaClient().prepareResponseFrom( + prepareAlterLogDirsResponse(error, "topic", partitions), node); + } + + protected AlterReplicaLogDirsResponse prepareAlterLogDirsResponse(Errors error, String topic, int... partitions) { + return new AlterReplicaLogDirsResponse( + new AlterReplicaLogDirsResponseData().setResults(singletonList( + new AlterReplicaLogDirTopicResult() + .setTopicName(topic) + .setPartitions(Arrays.stream(partitions).boxed().map(partitionId -> + new AlterReplicaLogDirPartitionResult() + .setPartitionIndex(partitionId) + .setErrorCode(error.code())).collect(Collectors.toList()))))); + } + + protected WriteTxnMarkersResponse writeTxnMarkersResponse( + AbortTransactionSpec abortSpec, + Errors error + ) { + WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult partitionResponse = + new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult() + .setPartitionIndex(abortSpec.topicPartition().partition()) + .setErrorCode(error.code()); + + WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult topicResponse = + new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult() + .setName(abortSpec.topicPartition().topic()); + topicResponse.partitions().add(partitionResponse); + + WriteTxnMarkersResponseData.WritableTxnMarkerResult markerResponse = + new WriteTxnMarkersResponseData.WritableTxnMarkerResult() + .setProducerId(abortSpec.producerId()); + markerResponse.topics().add(topicResponse); + + WriteTxnMarkersResponseData response = new WriteTxnMarkersResponseData(); + response.markers().add(markerResponse); + + return new WriteTxnMarkersResponse(response); + } + + protected DescribeProducersResponse buildDescribeProducersResponse( + TopicPartition topicPartition, + List<ProducerState> producerStates + ) { + DescribeProducersResponseData response = new DescribeProducersResponseData(); + + DescribeProducersResponseData.TopicResponse topicResponse = + new DescribeProducersResponseData.TopicResponse() + .setName(topicPartition.topic()); + response.topics().add(topicResponse); + + DescribeProducersResponseData.PartitionResponse partitionResponse = + new DescribeProducersResponseData.PartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(Errors.NONE.code()); + topicResponse.partitions().add(partitionResponse); + + partitionResponse.setActiveProducers(producerStates.stream().map(producerState -> + new DescribeProducersResponseData.ProducerState() + .setProducerId(producerState.producerId()) + .setProducerEpoch(producerState.producerEpoch()) + .setCoordinatorEpoch(producerState.coordinatorEpoch().orElse(-1)) + .setLastSequence(producerState.lastSequence()) + .setLastTimestamp(producerState.lastTimestamp()) + .setCurrentTxnStartOffset(producerState.currentTransactionStartOffset().orElse(-1L)) + ).collect(Collectors.toList())); + + return new DescribeProducersResponse(response); + } + + protected void expectMetadataRequest( + AdminClientUnitTestEnv env, + TopicPartition topicPartition, + Node leader + ) { + MetadataResponseData.MetadataResponseTopicCollection responseTopics = + new MetadataResponseData.MetadataResponseTopicCollection(); + + MetadataResponseTopic responseTopic = new MetadataResponseTopic() + .setName(topicPartition.topic()) + .setErrorCode(Errors.NONE.code()); + responseTopics.add(responseTopic); + + MetadataResponsePartition responsePartition = new MetadataResponsePartition() + .setErrorCode(Errors.NONE.code()) + .setPartitionIndex(topicPartition.partition()) + .setLeaderId(leader.id()) + .setReplicaNodes(singletonList(leader.id())) + .setIsrNodes(singletonList(leader.id())); + responseTopic.partitions().add(responsePartition); + + env.kafkaClient().prepareResponse( + request -> { + if (!(request instanceof MetadataRequest)) { + return false; + } + MetadataRequest metadataRequest = (MetadataRequest) request; + return metadataRequest.topics().equals(singletonList(topicPartition.topic())); + }, + new MetadataResponse(new MetadataResponseData().setTopics(responseTopics), + MetadataResponseData.HIGHEST_SUPPORTED_VERSION) + ); + } + + protected UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) { + return new UnregisterBrokerResponse(new UnregisterBrokerResponseData() + .setErrorCode(error.code()) + .setErrorMessage(error.message()) + .setThrottleTimeMs(throttleTimeMs)); + } + + protected DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir) { + return new DescribeLogDirsResponse(new DescribeLogDirsResponseData() + .setResults(Collections.singletonList( + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir)))); + } + + protected static OffsetFetchResponse offsetFetchResponse(Errors error) { + return new OffsetFetchResponse( + new OffsetFetchResponseData() + .setGroups(List.of( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(GROUP_ID) + .setErrorCode(error.code()) + )), + ApiKeys.OFFSET_FETCH.latestVersion() + ); + } + + protected static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, + MemberAssignment assignment) { + return new MemberDescription(member.memberId(), + Optional.ofNullable(member.groupInstanceId()), + Optional.empty(), + member.clientId(), + member.clientHost(), + assignment, + Optional.empty(), + Optional.empty(), + Optional.empty()); + } + + protected static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member, Review Comment: Thanks @chia7712 , I have updated the PR to move all the helpers to their respective test classes, keeping only the commonly used ones in the Base class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
