This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 697a4af  KAFKA-6363: Use MockAdminClient for any unit tests that 
depend on AdminClient (#4371)
697a4af is described below

commit 697a4af35a3672dd339c3a6a34d5ab27e31e0fbd
Author: Filipe Agapito <filipe.agap...@gmail.com>
AuthorDate: Mon Jan 8 19:58:56 2018 +0000

    KAFKA-6363: Use MockAdminClient for any unit tests that depend on 
AdminClient (#4371)
    
    * Implement MockAdminClient.deleteTopics
    * Use MockAdminClient instead of MockKafkaAdminClientEnv in 
StreamsResetterTest
    * Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
    * Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
    * Rename KafkaAdminClient to AdminClientUnitTestEnv in 
KafkaAdminClientTest.java
    * Migrate StreamThreadTest to MockAdminClient
    * Fix style errors
    * Address review comments
    * Fix MockAdminClient call
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Konstantine Karantasis 
<konstant...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 ...nClientEnv.java => AdminClientUnitTestEnv.java} |  8 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 28 +++++------
 .../kafka/clients/admin/MockAdminClient.java       | 35 ++++++++++++--
 .../apache/kafka/connect/util/TopicAdminTest.java  | 55 +++++++---------------
 .../processor/internals/StreamThreadTest.java      | 13 +++++
 .../kafka/streams/tools/StreamsResetterTest.java   | 22 ++++-----
 .../org/apache/kafka/test/MockClientSupplier.java  |  6 +--
 7 files changed, 92 insertions(+), 75 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
similarity index 91%
rename from 
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
rename to 
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index cca35ac..10281fb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -35,21 +35,21 @@ import java.util.Map;
  * <p>
  * When finished, be sure to {@link #close() close} the environment object.
  */
-public class MockKafkaAdminClientEnv implements AutoCloseable {
+public class AdminClientUnitTestEnv implements AutoCloseable {
     private final Time time;
     private final Cluster cluster;
     private final MockClient mockClient;
     private final KafkaAdminClient adminClient;
 
-    public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
         this(Time.SYSTEM, cluster, vals);
     }
 
-    public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
         this(time, cluster, newStrMap(vals));
     }
 
-    public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c0fe73c..84588a9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -38,10 +38,10 @@ import 
org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -75,8 +75,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
-import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
+import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -155,7 +155,7 @@ public class KafkaAdminClientTest {
                 
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
     }
 
-    private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) 
{
+    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
         nodes.put(1, new Node(1, "localhost", 8122));
@@ -163,12 +163,12 @@ public class KafkaAdminClientTest {
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
                 Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
                 Collections.<String>emptySet(), nodes.get(0));
-        return new MockKafkaAdminClientEnv(cluster, configVals);
+        return new AdminClientUnitTestEnv(cluster, configVals);
     }
 
     @Test
     public void testCloseAdminClient() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
         }
     }
 
@@ -190,7 +190,7 @@ public class KafkaAdminClientTest {
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        try (MockKafkaAdminClientEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
@@ -203,7 +203,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testCreateTopics() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -226,7 +226,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDescribeAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -250,7 +250,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testCreateAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -279,7 +279,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDeleteAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -330,7 +330,7 @@ public class KafkaAdminClientTest {
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
             Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
             Collections.<String>emptySet(), nodes.get(0));
-        try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(time, 
cluster,
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
             AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
                 AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -374,7 +374,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDescribeConfigs() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -390,7 +390,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testCreatePartitions() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -443,7 +443,7 @@ public class KafkaAdminClientTest {
         TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
         TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4);
 
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().nodes().get(0));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b6a5888..c950163 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -232,8 +232,32 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    public DeleteTopicsResult deleteTopics(Collection<String> topics, 
DeleteTopicsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+    public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, 
DeleteTopicsOptions options) {
+        Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (final String topicName : topicsToDelete) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                deleteTopicsResult.put(topicName, future);
+            }
+
+            --timeoutNextRequests;
+            return new DeleteTopicsResult(deleteTopicsResult);
+        }
+
+        for (final String topicName : topicsToDelete) {
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+
+            if (allTopics.remove(topicName) == null) {
+                future.completeExceptionally(new 
UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", 
topicName)));
+            } else {
+                future.complete(null);
+            }
+            deleteTopicsResult.put(topicName, future);
+        }
+
+        return new DeleteTopicsResult(deleteTopicsResult);
     }
 
     @Override
@@ -243,7 +267,12 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     public DeleteRecordsResult deleteRecords(Map<TopicPartition, 
RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        Map<TopicPartition, KafkaFuture<DeletedRecords>> deletedRecordsResult 
= new HashMap<>();
+        if (recordsToDelete.isEmpty()) {
+            return new DeleteRecordsResult(deletedRecordsResult);
+        } else {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 0a61d3e..c58d674 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -47,7 +49,7 @@ public class TopicAdminTest {
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNode(cluster.controller());
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
@@ -62,14 +64,11 @@ public class TopicAdminTest {
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            
env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertFalse(created);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, "myTopic", 
Collections.singletonList(topicPartitionInfo), null);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            assertFalse(admin.createTopic(newTopic));
         }
     }
 
@@ -77,14 +76,9 @@ public class TopicAdminTest {
     public void shouldCreateTopicWhenItDoesNotExist() {
         NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(createTopicResponse(newTopic));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertTrue(created);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            assertTrue(admin.createTopic(newTopic));
         }
     }
 
@@ -93,12 +87,8 @@ public class TopicAdminTest {
         NewTopic newTopic1 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         NewTopic newTopic2 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(createTopicResponse(newTopic1));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
             Set<String> newTopicNames = admin.createTopics(newTopic1, 
newTopic2);
             assertEquals(1, newTopicNames.size());
             assertEquals(newTopic2.name(), newTopicNames.iterator().next());
@@ -108,11 +98,8 @@ public class TopicAdminTest {
     @Test
     public void shouldReturnFalseWhenSuppliedNullTopicDescription() {
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
             boolean created = admin.createTopic(null);
             assertFalse(created);
         }
@@ -120,7 +107,7 @@ public class TopicAdminTest {
 
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
-        for (int i = 0; i != numNodes; ++i) {
+        for (int i = 0; i < numNodes; ++i) {
             nodes.put(i, new Node(i, "localhost", 8121 + i));
         }
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
@@ -129,14 +116,6 @@ public class TopicAdminTest {
         return cluster;
     }
 
-    private CreateTopicsResponse createTopicResponse(NewTopic... topics) {
-        return createTopicResponse(new ApiError(Errors.NONE, ""), topics);
-    }
-
-    private CreateTopicsResponse 
createTopicResponseWithAlreadyExists(NewTopic... topics) {
-        return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, 
"Topic already exists"), topics);
-    }
-
     private CreateTopicsResponse 
createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
         return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, 
"This version of the API is not supported"), topics);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4250465..6b760c1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -186,11 +187,23 @@ public class StreamThreadTest {
         assertEquals(thread.state(), StreamThread.State.DEAD);
     }
 
+    private Cluster createCluster(int numNodes) {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        for (int i = 0; i < numNodes; ++i) {
+            nodes.put(i, new Node(i, "localhost", 8121 + i));
+        }
+        return new Cluster("mockClusterId", nodes.values(),
+            Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
+            Collections.<String>emptySet(), nodes.get(0));
+    }
+
     private StreamThread createStreamThread(final String clientId, final 
StreamsConfig config, final boolean eosEnabled) {
         if (eosEnabled) {
             clientSupplier.setApplicationIdForProducer(applicationId);
         }
 
+        clientSupplier.setClusterForAdminClient(createCluster(1));
+
         return StreamThread.create(internalTopologyBuilder,
                                    config,
                                    clientSupplier,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index c6b0f5f..dd32ad0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -17,8 +17,7 @@
 package org.apache.kafka.streams.tools;
 
 import kafka.tools.StreamsResetter;
-import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -27,8 +26,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,6 +38,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -235,20 +234,19 @@ public class StreamsResetterTest {
     }
 
     @Test
-    public void shouldDeleteTopic() {
+    public void shouldDeleteTopic() throws InterruptedException, 
ExecutionException {
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(new 
DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE)));
-            
streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), 
env.adminClient());
+        try (MockAdminClient adminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            adminClient.addTopic(false, TOPIC, 
Collections.singletonList(topicPartitionInfo), null);
+            streamsResetter.doDelete(Collections.singletonList(TOPIC), 
adminClient);
+            assertEquals(Collections.emptySet(), 
adminClient.listTopics().names().get());
         }
     }
 
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
-        for (int i = 0; i != numNodes; ++i) {
+        for (int i = 0; i < numNodes; ++i) {
             nodes.put(i, new Node(i, "localhost", 8121 + i));
         }
         return new Cluster("mockClusterId", nodes.values(),
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index ae83c60..1ec28fa 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
 import java.util.LinkedList;
@@ -59,8 +58,7 @@ public class MockClientSupplier implements 
KafkaClientSupplier {
 
     @Override
     public AdminClient getAdminClient(final Map<String, Object> config) {
-        MockKafkaAdminClientEnv clientEnv = new 
MockKafkaAdminClientEnv(Time.SYSTEM, cluster, config);
-        return clientEnv.adminClient();
+        return new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Reply via email to