This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 697a4af KAFKA-6363: Use MockAdminClient for any unit tests that depend on AdminClient (#4371) 697a4af is described below commit 697a4af35a3672dd339c3a6a34d5ab27e31e0fbd Author: Filipe Agapito <filipe.agap...@gmail.com> AuthorDate: Mon Jan 8 19:58:56 2018 +0000 KAFKA-6363: Use MockAdminClient for any unit tests that depend on AdminClient (#4371) * Implement MockAdminClient.deleteTopics * Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest * Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv * Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest * Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java * Migrate StreamThreadTest to MockAdminClient * Fix style errors * Address review comments * Fix MockAdminClient call Reviewers: Matthias J. Sax <matth...@confluent.io>, Konstantine Karantasis <konstant...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- ...nClientEnv.java => AdminClientUnitTestEnv.java} | 8 ++-- .../kafka/clients/admin/KafkaAdminClientTest.java | 28 +++++------ .../kafka/clients/admin/MockAdminClient.java | 35 ++++++++++++-- .../apache/kafka/connect/util/TopicAdminTest.java | 55 +++++++--------------- .../processor/internals/StreamThreadTest.java | 13 +++++ .../kafka/streams/tools/StreamsResetterTest.java | 22 ++++----- .../org/apache/kafka/test/MockClientSupplier.java | 6 +-- 7 files changed, 92 insertions(+), 75 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java similarity index 91% rename from clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java rename to clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index cca35ac..10281fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -35,21 +35,21 @@ import java.util.Map; * <p> * When finished, be sure to {@link #close() close} the environment object. */ -public class MockKafkaAdminClientEnv implements AutoCloseable { +public class AdminClientUnitTestEnv implements AutoCloseable { private final Time time; private final Cluster cluster; private final MockClient mockClient; private final KafkaAdminClient adminClient; - public MockKafkaAdminClientEnv(Cluster cluster, String...vals) { + public AdminClientUnitTestEnv(Cluster cluster, String...vals) { this(Time.SYSTEM, cluster, vals); } - public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) { + public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) { this(time, cluster, newStrMap(vals)); } - public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, Object> config) { + public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) { this.time = time; this.cluster = cluster; AdminClientConfig adminClientConfig = new AdminClientConfig(config); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index c0fe73c..84588a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,10 +38,10 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; @@ -75,8 +75,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static java.util.Arrays.asList; -import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.apache.kafka.common.requests.ResourceType.BROKER; +import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -155,7 +155,7 @@ public class KafkaAdminClientTest { KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId"))); } - private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) { + private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { HashMap<Integer, Node> nodes = new HashMap<>(); nodes.put(0, new Node(0, "localhost", 8121)); nodes.put(1, new Node(1, "localhost", 8122)); @@ -163,12 +163,12 @@ public class KafkaAdminClientTest { Cluster cluster = new Cluster("mockClusterId", nodes.values(), Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), nodes.get(0)); - return new MockKafkaAdminClientEnv(cluster, configVals); + return new AdminClientUnitTestEnv(cluster, configVals); } @Test public void testCloseAdminClient() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { } } @@ -190,7 +190,7 @@ public class KafkaAdminClientTest { */ @Test public void testTimeoutWithoutMetadata() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNode(new Node(0, "localhost", 8121)); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); @@ -203,7 +203,7 @@ public class KafkaAdminClientTest { @Test public void testCreateTopics() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -226,7 +226,7 @@ public class KafkaAdminClientTest { @Test public void testDescribeAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -250,7 +250,7 @@ public class KafkaAdminClientTest { @Test public void testCreateAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -279,7 +279,7 @@ public class KafkaAdminClientTest { @Test public void testDeleteAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -330,7 +330,7 @@ public class KafkaAdminClientTest { Cluster cluster = new Cluster("mockClusterId", nodes.values(), Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), nodes.get(0)); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(time, cluster, + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1", AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -374,7 +374,7 @@ public class KafkaAdminClientTest { @Test public void testDescribeConfigs() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -390,7 +390,7 @@ public class KafkaAdminClientTest { @Test public void testCreatePartitions() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -443,7 +443,7 @@ public class KafkaAdminClientTest { TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3); TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().nodes().get(0)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b6a5888..c950163 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -232,8 +232,32 @@ public class MockAdminClient extends AdminClient { } @Override - public DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) { + Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>(); + + if (timeoutNextRequests > 0) { + for (final String topicName : topicsToDelete) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + deleteTopicsResult.put(topicName, future); + } + + --timeoutNextRequests; + return new DeleteTopicsResult(deleteTopicsResult); + } + + for (final String topicName : topicsToDelete) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + + if (allTopics.remove(topicName) == null) { + future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicName))); + } else { + future.complete(null); + } + deleteTopicsResult.put(topicName, future); + } + + return new DeleteTopicsResult(deleteTopicsResult); } @Override @@ -243,7 +267,12 @@ public class MockAdminClient extends AdminClient { @Override public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + Map<TopicPartition, KafkaFuture<DeletedRecords>> deletedRecordsResult = new HashMap<>(); + if (recordsToDelete.isEmpty()) { + return new DeleteRecordsResult(deletedRecordsResult); + } else { + throw new UnsupportedOperationException("Not implemented yet"); + } } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 0a61d3e..c58d674 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -17,11 +17,13 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateTopicsResponse; @@ -47,7 +49,7 @@ public class TopicAdminTest { public void returnNullWithApiVersionMismatch() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNode(cluster.controller()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); @@ -62,14 +64,11 @@ public class TopicAdminTest { public void shouldNotCreateTopicWhenItAlreadyExists() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertFalse(created); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList()); + mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null); + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); + assertFalse(admin.createTopic(newTopic)); } } @@ -77,14 +76,9 @@ public class TopicAdminTest { public void shouldCreateTopicWhenItDoesNotExist() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponse(newTopic)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertTrue(created); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); + assertTrue(admin.createTopic(newTopic)); } } @@ -93,12 +87,8 @@ public class TopicAdminTest { NewTopic newTopic1 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); NewTopic newTopic2 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponse(newTopic1)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); Set<String> newTopicNames = admin.createTopics(newTopic1, newTopic2); assertEquals(1, newTopicNames.size()); assertEquals(newTopic2.name(), newTopicNames.iterator().next()); @@ -108,11 +98,8 @@ public class TopicAdminTest { @Test public void shouldReturnFalseWhenSuppliedNullTopicDescription() { Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); boolean created = admin.createTopic(null); assertFalse(created); } @@ -120,7 +107,7 @@ public class TopicAdminTest { private Cluster createCluster(int numNodes) { HashMap<Integer, Node> nodes = new HashMap<>(); - for (int i = 0; i != numNodes; ++i) { + for (int i = 0; i < numNodes; ++i) { nodes.put(i, new Node(i, "localhost", 8121 + i)); } Cluster cluster = new Cluster("mockClusterId", nodes.values(), @@ -129,14 +116,6 @@ public class TopicAdminTest { return cluster; } - private CreateTopicsResponse createTopicResponse(NewTopic... topics) { - return createTopicResponse(new ApiError(Errors.NONE, ""), topics); - } - - private CreateTopicsResponse createTopicResponseWithAlreadyExists(NewTopic... topics) { - return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic already exists"), topics); - } - private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic... topics) { return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 4250465..6b760c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -186,11 +187,23 @@ public class StreamThreadTest { assertEquals(thread.state(), StreamThread.State.DEAD); } + private Cluster createCluster(int numNodes) { + HashMap<Integer, Node> nodes = new HashMap<>(); + for (int i = 0; i < numNodes; ++i) { + nodes.put(i, new Node(i, "localhost", 8121 + i)); + } + return new Cluster("mockClusterId", nodes.values(), + Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), + Collections.<String>emptySet(), nodes.get(0)); + } + private StreamThread createStreamThread(final String clientId, final StreamsConfig config, final boolean eosEnabled) { if (eosEnabled) { clientSupplier.setApplicationIdForProducer(applicationId); } + clientSupplier.setClusterForAdminClient(createCluster(1)); + return StreamThread.create(internalTopologyBuilder, config, clientSupplier, diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index c6b0f5f..dd32ad0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.tools; import kafka.tools.StreamsResetter; -import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; @@ -27,8 +26,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.apache.kafka.common.TopicPartitionInfo; import org.junit.Before; import org.junit.Test; @@ -40,6 +38,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -235,20 +234,19 @@ public class StreamsResetterTest { } @Test - public void shouldDeleteTopic() { + public void shouldDeleteTopic() throws InterruptedException, ExecutionException { Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(new DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE))); - streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), env.adminClient()); + try (MockAdminClient adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList()); + adminClient.addTopic(false, TOPIC, Collections.singletonList(topicPartitionInfo), null); + streamsResetter.doDelete(Collections.singletonList(TOPIC), adminClient); + assertEquals(Collections.emptySet(), adminClient.listTopics().names().get()); } } private Cluster createCluster(int numNodes) { HashMap<Integer, Node> nodes = new HashMap<>(); - for (int i = 0; i != numNodes; ++i) { + for (int i = 0; i < numNodes; ++i) { nodes.put(i, new Node(i, "localhost", 8121 + i)); } return new Cluster("mockClusterId", nodes.values(), diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index ae83c60..1ec28fa 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -17,7 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import java.util.LinkedList; @@ -59,8 +58,7 @@ public class MockClientSupplier implements KafkaClientSupplier { @Override public AdminClient getAdminClient(final Map<String, Object> config) { - MockKafkaAdminClientEnv clientEnv = new MockKafkaAdminClientEnv(Time.SYSTEM, cluster, config); - return clientEnv.adminClient(); + return new MockAdminClient(cluster.nodes(), cluster.nodeById(0)); } @Override -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].