This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new d5b53ad KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)" d5b53ad is described below commit d5b53ad132d1c1bfcd563ce5015884b6da831777 Author: Randall Hauch <rha...@gmail.com> AuthorDate: Thu Mar 10 15:16:24 2022 -0600 KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)" This reverts commit fe132ee29329116a60e77b1d3e56aef42ae6347c. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 41 ++-------------------- 2 files changed, 2 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { - if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 7f30846..28b4640 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -342,15 +342,11 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { - return prepareMetadataResponse(cluster, error, error); - } - - private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List<TopicMetadata> metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List<PartitionMetadata> pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { - PartitionMetadata pm = new PartitionMetadata(partitionError, + PartitionMetadata pm = new PartitionMetadata(error, new TopicPartition(topic, pInfo.partition()), Optional.of(pInfo.leader().id()), Optional.of(234), @@ -359,7 +355,7 @@ public class KafkaAdminClientTest { Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); pms.add(pm); } - TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms); + TopicMetadata tm = new TopicMetadata(error, topic, false, pms); metadata.add(tm); } return MetadataResponse.prepareResponse(0, @@ -2347,39 +2343,6 @@ public class KafkaAdminClientTest { } @Test - public void testListOffsetsRetriableErrorOnMetadata() throws Exception { - Node node = new Node(0, "localhost", 8120); - List<Node> nodes = Collections.singletonList(node); - final Cluster cluster = new Cluster( - "mockClusterId", - nodes, - Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), - Collections.emptySet(), - Collections.emptySet(), - node); - final TopicPartition tp0 = new TopicPartition("foo", 0); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); - // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - // listoffsets response from broker 0 - Map<TopicPartition, PartitionData> responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); - - ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest())); - - Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); - assertEquals(1, offsets.size()); - assertEquals(123L, offsets.get(tp0).offset()); - assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue()); - assertEquals(-1L, offsets.get(tp0).timestamp()); - } - } - - @Test public void testListOffsetsRetriableErrors() throws Exception { Node node0 = new Node(0, "localhost", 8120);