This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new d5b53ad  KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's 
listOffsets (#10152)"
d5b53ad is described below

commit d5b53ad132d1c1bfcd563ce5015884b6da831777
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Thu Mar 10 15:16:24 2022 -0600

    KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets 
(#10152)"
    
    This reverts commit fe132ee29329116a60e77b1d3e56aef42ae6347c.
---
 .../admin/internals/MetadataOperationContext.java  |  1 -
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 41 ++--------------------
 2 files changed, 2 insertions(+), 40 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e7f2c07..c05e5cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends 
AbstractOptions<O>> {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
-            if (shouldRefreshMetadata(tm.error())) throw 
tm.error().exception();
             for (PartitionMetadata pm : tm.partitionMetadata()) {
                 if (shouldRefreshMetadata(pm.error)) {
                     throw pm.error.exception();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7f30846..28b4640 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -342,15 +342,11 @@ public class KafkaAdminClientTest {
     }
 
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, 
Errors error) {
-        return prepareMetadataResponse(cluster, error, error);
-    }
-
-    private static MetadataResponse prepareMetadataResponse(Cluster cluster, 
Errors topicError, Errors partitionError) {
         List<TopicMetadata> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<PartitionMetadata> pms = new ArrayList<>();
             for (PartitionInfo pInfo : 
cluster.availablePartitionsForTopic(topic)) {
-                PartitionMetadata pm = new PartitionMetadata(partitionError,
+                PartitionMetadata pm = new PartitionMetadata(error,
                         new TopicPartition(topic, pInfo.partition()),
                         Optional.of(pInfo.leader().id()),
                         Optional.of(234),
@@ -359,7 +355,7 @@ public class KafkaAdminClientTest {
                         
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
                 pms.add(pm);
             }
-            TopicMetadata tm = new TopicMetadata(topicError, topic, false, 
pms);
+            TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
             metadata.add(tm);
         }
         return MetadataResponse.prepareResponse(0,
@@ -2347,39 +2343,6 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
-        Node node = new Node(0, "localhost", 8120);
-        List<Node> nodes = Collections.singletonList(node);
-        final Cluster cluster = new Cluster(
-            "mockClusterId",
-            nodes,
-            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
-            Collections.emptySet(),
-            Collections.emptySet(),
-            node);
-        final TopicPartition tp0 = new TopicPartition("foo", 0);
-
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
-            // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
-            // listoffsets response from broker 0
-            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-            responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, 
Optional.of(321)));
-            env.kafkaClient().prepareResponse(new 
ListOffsetResponse(responseData));
-
-            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.latest()));
-
-            Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
-            assertEquals(1, offsets.size());
-            assertEquals(123L, offsets.get(tp0).offset());
-            assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
-            assertEquals(-1L, offsets.get(tp0).timestamp());
-        }
-    }
-
-    @Test
     public void testListOffsetsRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);

Reply via email to