This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 46840cb  KAFKA-9216: Enforce that Connect’s internal topics use 
`compact` cleanup policy (#8828)
46840cb is described below

commit 46840cbfe51aab3f74449112b8ee6d89ca221bb2
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Wed Jun 10 22:39:52 2020 -0500

    KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup 
policy (#8828)
    
    This change adds a check to the KafkaConfigBackingStore, 
KafkaOffsetBackingStore, and KafkaStatusBackingStore to use the admin client to 
verify that the internal topics are compacted and do not use the `delete` 
cleanup policy.
    
    Connect already will create the internal topics with 
`cleanup.policy=compact` if the topics do not yet exist when the Connect 
workers are started; the new topics are created always as compacted, 
overwriting any user-specified `cleanup.policy`. However, if the topics already 
exist the worker did not previously verify the internal topics were compacted, 
such as when a user manually creates the internal topics before starting 
Connect or manually changes the topic settings after the fact.
    
    The current change helps guard against users running Connect with topics 
that have delete cleanup policy enabled, which will remove all connector 
configurations, source offsets, and connector & task statuses that are older 
than the retention time. This means that, for example, the configuration for a 
long-running connector could be deleted by the broker, and this will cause 
restart issues upon a subsequent rebalance or restarting of Connect worker(s).
    
    Connect behavior requires that its internal topics are compacted and not 
deleted after some retention time. Therefore, this additional check is simply 
enforcing the existing expectations, and therefore does not need a KIP.
    
    Author: Randall Hauch <rha...@gmail.com>
    Reviewer: Konstantine Karantasis <konstant...@confluent.io>, Chris Egerton 
<chr...@confluent.io>
---
 .../kafka/clients/admin/MockAdminClient.java       |  45 +++++
 .../kafka/connect/runtime/AbstractHerder.java      |   6 +
 .../org/apache/kafka/connect/runtime/Connect.java  |   4 +
 .../org/apache/kafka/connect/runtime/Herder.java   |   2 +
 .../runtime/distributed/DistributedHerder.java     |   4 +
 .../runtime/standalone/StandaloneHerder.java       |   2 +
 .../connect/storage/KafkaConfigBackingStore.java   |  10 +-
 .../connect/storage/KafkaOffsetBackingStore.java   |  11 +-
 .../connect/storage/KafkaStatusBackingStore.java   |  10 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 164 +++++++++++++++
 .../integration/InternalTopicsIntegrationTest.java | 138 +++++++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  | 219 +++++++++++++++++++++
 .../util/clusters/EmbeddedConnectCluster.java      |  22 ++-
 .../util/clusters/EmbeddedKafkaCluster.java        |   4 +-
 .../kafka/connect/util/clusters/WorkerHandle.java  |   9 +
 15 files changed, 645 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c084c99..2b86d4f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -366,6 +366,51 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> 
resources) {
+        Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new 
HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (ConfigResource requestedResource : resources) {
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                topicConfigs.put(requestedResource, future);
+            }
+
+            --timeoutNextRequests;
+            return new DescribeConfigsResult(topicConfigs);
+        }
+
+        for (ConfigResource requestedResource : resources) {
+            if (requestedResource.type() != ConfigResource.Type.TOPIC) {
+                continue;
+            }
+            for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
+                String topicName = topicDescription.getKey();
+                if (topicName.equals(requestedResource.name()) && 
!topicDescription.getValue().markedForDeletion) {
+                    if 
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+                        
topicDescription.getValue().fetchesRemainingUntilVisible--;
+                    } else {
+                        TopicMetadata topicMetadata = 
topicDescription.getValue();
+                        KafkaFutureImpl<Config> future = new 
KafkaFutureImpl<>();
+                        Collection<ConfigEntry> entries = new ArrayList<>();
+                        topicMetadata.configs.forEach((k, v) -> 
entries.add(new ConfigEntry(k, v)));
+                        future.complete(new Config(entries));
+                        topicConfigs.put(requestedResource, future);
+                        break;
+                    }
+                }
+            }
+            if (!topicConfigs.containsKey(requestedResource)) {
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new 
UnknownTopicOrPartitionException("Resource " + requestedResource + " not 
found."));
+                topicConfigs.put(requestedResource, future);
+            }
+        }
+
+        return new DescribeConfigsResult(topicConfigs);
+    }
+
+    @Override
     synchronized public DeleteTopicsResult deleteTopics(Collection<String> 
topicsToDelete, DeleteTopicsOptions options) {
         Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 08163cc..5e3cb86 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -94,6 +94,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
     private final ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy;
+    protected volatile boolean running = false;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
 
@@ -132,6 +133,11 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
     }
 
     @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    @Override
     public void onStartup(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.RUNNING,
                 workerId, generation()));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index f08586f..80eef03 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -84,6 +84,10 @@ public class Connect {
         }
     }
 
+    public boolean isRunning() {
+        return herder.isRunning();
+    }
+
     // Visible for testing
     public URI restUrl() {
         return rest.serverUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index cbd7f3f..c8b6b80 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -59,6 +59,8 @@ public interface Herder {
 
     void stop();
 
+    boolean isRunning();
+
     /**
      * Get a list of connectors currently running in this cluster. This is a 
full list of connectors in the cluster gathered
      * from the current configuration. However, note
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7762f9f..c7051db 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -288,6 +288,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             startServices();
 
             log.info("Herder started");
+            running = true;
 
             while (!stopping.get()) {
                 tick();
@@ -300,6 +301,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", 
t);
             Exit.exit(1);
+        } finally {
+            running = false;
         }
     }
 
@@ -638,6 +641,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
 
         log.info("Herder stopped");
+        running = false;
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index ade3f9e..a2b082e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -92,6 +92,7 @@ public class StandaloneHerder extends AbstractHerder {
     public synchronized void start() {
         log.info("Herder starting");
         startServices();
+        running = true;
         log.info("Herder started");
     }
 
@@ -114,6 +115,7 @@ public class StandaloneHerder extends AbstractHerder {
             worker.stopConnector(connName);
         }
         stopServices();
+        running = false;
         log.info("Herder stopped");
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9512e03..29299a8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -492,7 +493,14 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal 
config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = 
admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup 
policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy 
of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.CONFIG_TOPIC_CONFIG, 
"connector configurations");
+                    }
                 }
             }
         };
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 1d26d65..8408f99 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +41,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -106,7 +108,14 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal 
offset topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = 
admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup 
policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy 
for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"source connector offsets");
+                    }
                 }
             }
         };
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 0b0f4a5..5d6057d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -197,7 +198,14 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal 
status topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = 
admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup 
policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy 
of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"connector and task statuses");
+                    }
                 }
             }
         };
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 4809b49..615e7a3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -18,11 +18,16 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -39,13 +44,16 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /**
  * Utility to simplify creating and managing topics via the {@link Admin}.
@@ -375,6 +383,162 @@ public class TopicAdmin implements AutoCloseable {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+     * @return true if the admin client could be used to verify the topic 
setting, or false if
+     *         the verification could not be performed, likely because the 
admin client principal
+     *         did not have the required permissions or because the broker was 
older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the 
required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.info("Unable to use admin client to verify the cleanup policy 
of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for 
Connect "
+                      + "internal topics does not have the required permission 
to "
+                      + "describe topic configurations.", topic, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = 
Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' 
property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability 
of "
+                    + "%s, but found the topic currently has '%s=%s'. 
Continuing would likely "
+                    + "result in eventually losing %s and problems restarting 
this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with 
'%s=%s'.",
+                    topic, workerTopicConfig, 
TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, 
cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, 
expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if 
the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", 
topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, 
topic);
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .filter(s -> !s.isEmpty())
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        // This is unexpected, as the topic config should include the 
cleanup.policy even if
+        // the topic settings don't override the broker's log.cleanup.policy. 
But just to be safe.
+        log.debug("Found no cleanup.policy for topic '{}'", topic);
+        return Collections.emptySet();
+    }
+
+    /**
+     * Attempt to fetch the topic configuration for the given topic.
+     * Apache Kafka added support for describing topic configurations in 
0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 
0.11.0.0, this method
+     * is unable get the topic configurations and always returns a null value.
+     *
+     * <p>If the topic does not exist, a null value is returned.
+     *
+     * @param topic the name of the topic for which the topic configuration 
should be obtained
+     * @return the topic configuration if the topic exists, or null if the 
topic did not exist
+     * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Config describeTopicConfig(String topic) {
+        return describeTopicConfigs(topic).get(topic);
+    }
+
+    /**
+     * Attempt to fetch the topic configurations for the given topics.
+     * Apache Kafka added support for describing topic configurations in 
0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 
0.11.0.0, this method
+     * is unable get the topic configurations and always returns an empty set.
+     *
+     * <p>An entry with a null Config is placed into the resulting map for any 
topic that does
+     * not exist on the brokers.
+     *
+     * @param topicNames the topics to obtain configurations
+     * @return the map of topic configurations for each existing topic, or an 
empty map if none
+     * of the topics exist
+     * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Map<String, Config> describeTopicConfigs(String... topicNames) {
+        if (topicNames == null) {
+            return Collections.emptyMap();
+        }
+        Collection<String> topics = Arrays.stream(topicNames)
+                                          .filter(Objects::nonNull)
+                                          .map(String::trim)
+                                          .filter(s -> !s.isEmpty())
+                                          .collect(Collectors.toList());
+        if (topics.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        String bootstrapServers = bootstrapServers();
+        String topicNameList = topics.stream().collect(Collectors.joining(", 
"));
+        Collection<ConfigResource> resources = topics.stream()
+                                                     .map(t -> new 
ConfigResource(ConfigResource.Type.TOPIC, t))
+                                                     
.collect(Collectors.toList());
+
+        Map<ConfigResource, KafkaFuture<Config>> newResults = 
admin.describeConfigs(resources, new DescribeConfigsOptions()).values();
+
+        // Iterate over each future so that we can handle individual failures 
like when some topics don't exist
+        Map<String, Config> result = new HashMap<>();
+        newResults.forEach((resource, configs) -> {
+            String topic = resource.name();
+            try {
+                result.put(topic, configs.get());
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                if (cause instanceof UnknownTopicOrPartitionException) {
+                    log.debug("Topic '{}' does not exist on the brokers at 
{}", topic, bootstrapServers);
+                    result.put(topic, null);
+                } else if (cause instanceof ClusterAuthorizationException || 
cause instanceof TopicAuthorizationException) {
+                    log.debug("Not authorized to describe topic config for 
topic '{}' on brokers at {}", topic, bootstrapServers);
+                } else if (cause instanceof UnsupportedVersionException) {
+                    log.debug("API to describe topic config for topic '{}' is 
unsupported on brokers at {}", topic, bootstrapServers);
+                } else if (cause instanceof TimeoutException) {
+                    String msg = String.format("Timed out while waiting to 
describe topic config for topic '%s' on brokers at %s",
+                            topic, bootstrapServers);
+                    throw new RetriableException(msg, e);
+                } else {
+                    String msg = String.format("Error while attempting to 
describe topic config for topic '%s' on brokers at %s",
+                            topic, bootstrapServers);
+                    throw new ConnectException(msg, e);
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                String msg = String.format("Interrupted while attempting to 
describe topic configs '%s'", topicNameList);
+                throw new RetriableException(msg, e);
+            }
+        });
+        return result;
+    }
+
     @Override
     public void close() {
         admin.close();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index 191de84..d73d1c4 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.connect.integration;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
@@ -30,6 +33,8 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertFalse;
+
 /**
  * Integration test for the creation of internal topics.
  */
@@ -141,6 +146,139 @@ public class InternalTopicsIntegrationTest {
         connect.assertions().assertTopicsDoNotExist(configTopic(), 
offsetTopic());
     }
 
+    @Test
+    public void testFailToStartWhenInternalTopicsAreNotCompacted() throws 
InterruptedException {
+        // Change the broker default cleanup policy to something Connect 
doesn't like
+        brokerProps.put("log." + TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
+        // Start out using the improperly configured topics
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "bad-config");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"bad-offset");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"bad-status");
+        
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        int numWorkers = 0;
+        int numBrokers = 1;
+        connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")
+                                                      .workerProps(workerProps)
+                                                      .numWorkers(numWorkers)
+                                                      .numBrokers(numBrokers)
+                                                      .brokerProps(brokerProps)
+                                                      .build();
+
+        // Start the brokers but not Connect
+        log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
+        connect.start();
+        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
+        log.info("Completed startup of {} Kafka broker. Expected Connect 
worker to fail", numBrokers);
+
+        // Create the good topics
+        connect.kafka().createTopic("good-config", 1, 1, 
compactCleanupPolicy());
+        connect.kafka().createTopic("good-offset", 1, 1, 
compactCleanupPolicy());
+        connect.kafka().createTopic("good-status", 1, 1, 
compactCleanupPolicy());
+
+        // Create the poorly-configured topics
+        connect.kafka().createTopic("bad-config", 1, 1, deleteCleanupPolicy());
+        connect.kafka().createTopic("bad-offset", 1, 1, 
compactAndDeleteCleanupPolicy());
+        connect.kafka().createTopic("bad-status", 1, 1, noTopicSettings());
+
+        // Check the topics
+        log.info("Verifying the internal topics for Connect were manually 
created");
+        connect.assertions().assertTopicsExist("good-config", "good-offset", 
"good-status", "bad-config", "bad-offset", "bad-status");
+
+        // Try to start one worker, with three bad topics
+        WorkerHandle worker = connect.addWorker(); // should have failed to 
start before returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+
+        // We rely upon the fact that we can change the worker properties 
before the workers are started
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "good-config");
+
+        // Try to start one worker, with two bad topics remaining
+        worker = connect.addWorker(); // should have failed to start before 
returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+
+        // We rely upon the fact that we can change the worker properties 
before the workers are started
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"good-offset");
+
+        // Try to start one worker, with one bad topic remaining
+        worker = connect.addWorker(); // should have failed to start before 
returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+        // We rely upon the fact that we can change the worker properties 
before the workers are started
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"good-status");
+
+        // Try to start one worker, now using all good internal topics
+        connect.addWorker();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not 
start in time.");
+    }
+
+    @Test
+    public void 
testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy()
 throws InterruptedException {
+        // Change the broker default cleanup policy to compact, which is good 
for Connect
+        brokerProps.put("log." + TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        // Start out using the properly configured topics
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"offset-topic");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"status-topic");
+        
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
+        int numWorkers = 0;
+        int numBrokers = 1;
+        connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")
+                                                      .workerProps(workerProps)
+                                                      .numWorkers(numWorkers)
+                                                      .numBrokers(numBrokers)
+                                                      .brokerProps(brokerProps)
+                                                      .build();
+
+        // Start the brokers but not Connect
+        log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
+        connect.start();
+        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
+        log.info("Completed startup of {} Kafka broker. Expected Connect 
worker to fail", numBrokers);
+
+        // Create the valid internal topics w/o topic settings, so these will 
use the broker's
+        // broker's log.cleanup.policy=compact (set above)
+        connect.kafka().createTopic("config-topic", 1, 1, noTopicSettings());
+        connect.kafka().createTopic("offset-topic", 1, 1, noTopicSettings());
+        connect.kafka().createTopic("status-topic", 1, 1, noTopicSettings());
+
+        // Check the topics
+        log.info("Verifying the internal topics for Connect were manually 
created");
+        connect.assertions().assertTopicsExist("config-topic", "offset-topic", 
"status-topic");
+
+        // Try to start one worker using valid internal topics
+        connect.addWorker();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not 
start in time.");
+    }
+
+    protected Map<String, String> compactCleanupPolicy() {
+        return Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+    }
+
+    protected Map<String, String> deleteCleanupPolicy() {
+        return Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
+    }
+
+    protected Map<String, String> noTopicSettings() {
+        return Collections.emptyMap();
+    }
+
+    protected Map<String, String> compactAndDeleteCleanupPolicy() {
+        Map<String, String> config = new HashMap<>();
+        config.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE + "," + TopicConfig.CLEANUP_POLICY_COMPACT);
+        return config;
+    }
+
     protected void assertInternalTopicSettings() throws InterruptedException {
         DistributedConfig config = new DistributedConfig(workerProps);
         connect.assertions().assertTopicSettings(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index caaa639..b655664 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
+import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -36,11 +40,14 @@ import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,6 +57,8 @@ import java.util.concurrent.ExecutionException;
 import static 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -261,6 +270,187 @@ public class TopicAdminTest {
         }
     }
 
+    @Test
+    public void 
describeTopicConfigShouldReturnEmptyMapWhenNoTopicsAreSpecified() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = admin.describeTopicConfigs();
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void 
describeTopicConfigShouldReturnEmptyMapWhenUnsupportedVersionFailure() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = 
admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void 
describeTopicConfigShouldReturnEmptyMapWhenClusterAuthorizationFailure() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = 
admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void 
describeTopicConfigShouldReturnEmptyMapWhenTopicAuthorizationFailure() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = 
admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void 
describeTopicConfigShouldReturnMapWithNullValueWhenTopicDoesNotExist() {
+        NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (TopicAdmin admin = new TopicAdmin(null, new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0)))) {
+            Map<String, Config> results = 
admin.describeTopicConfigs(newTopic.name());
+            assertFalse(results.isEmpty());
+            assertEquals(1, results.size());
+            assertNull(results.get("myTopic"));
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnTopicConfigWhenTopicExists() {
+        String topicName = "myTopic";
+        NewTopic newTopic = TopicAdmin.defineTopic(topicName)
+                                      .config(Collections.singletonMap("foo", 
"bar"))
+                                      .partitions(1)
+                                      .compacted()
+                                      .build();
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, 
Collections.singletonList(topicPartitionInfo), null);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            Map<String, Config> result = 
admin.describeTopicConfigs(newTopic.name());
+            assertFalse(result.isEmpty());
+            assertEquals(1, result.size());
+            Config config = result.get("myTopic");
+            assertNotNull(config);
+            config.entries().forEach(entry -> {
+                assertEquals(newTopic.configs().get(entry.name()), 
entry.value());
+            });
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldReturnFalseWhenBrokerVersionIsUnsupported() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = 
admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldReturnFalseWhenClusterAuthorizationError() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = 
admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldReturnFalseWhenTopicAuthorizationError() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            
env.kafkaClient().prepareResponse(describeConfigsResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = 
admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldReturnTrueWhenTopicHasCorrectPolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = 
Collections.singletonMap("cleanup.policy", "compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, 
Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            boolean result = 
admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertTrue(result);
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldFailWhenTopicHasDeletePolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = 
Collections.singletonMap("cleanup.policy", "delete");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, 
Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            ConfigException e = assertThrows(ConfigException.class, () -> {
+                admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", 
"worker.topic", "purpose");
+            });
+            assertTrue(e.getMessage().contains("to guarantee consistency and 
durability"));
+        }
+    }
+
+    @Test
+    public void 
verifyingTopicCleanupPolicyShouldFailWhenTopicHasDeleteAndCompactPolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = 
Collections.singletonMap("cleanup.policy", "delete,compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, 
Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            ConfigException e = assertThrows(ConfigException.class, () -> {
+                admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", 
"worker.topic", "purpose");
+            });
+            assertTrue(e.getMessage().contains("to guarantee consistency and 
durability"));
+        }
+    }
+
+    @Test
+    public void verifyingGettingTopicCleanupPolicies() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = 
Collections.singletonMap("cleanup.policy", "compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, 
Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            Set<String> policies = admin.topicCleanupPolicy("myTopic");
+            assertEquals(1, policies.size());
+            assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, 
policies.iterator().next());
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {
@@ -363,4 +553,33 @@ public class TopicAdminTest {
         }
         return new MetadataResponse(response);
     }
+
+    private DescribeConfigsResponse 
describeConfigsResponseWithUnsupportedVersion(NewTopic... topics) {
+        return describeConfigsResponse(new 
ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not 
supported"), topics);
+    }
+
+    private DescribeConfigsResponse 
describeConfigsResponseWithClusterAuthorizationException(NewTopic... topics) {
+        return describeConfigsResponse(new 
ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create 
topic(s)"), topics);
+    }
+
+    private DescribeConfigsResponse 
describeConfigsResponseWithTopicAuthorizationException(NewTopic... topics) {
+        return describeConfigsResponse(new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create 
topic(s)"), topics);
+    }
+
+    private DescribeConfigsResponse describeConfigsResponse(ApiError error, 
NewTopic... topics) {
+        if (error == null) error = new ApiError(Errors.NONE, "");
+        Map<ConfigResource, DescribeConfigsResponse.Config> configs = new 
HashMap<>();
+        for (NewTopic topic : topics) {
+            ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, topic.name());
+            DescribeConfigsResponse.ConfigSource source = 
DescribeConfigsResponse.ConfigSource.TOPIC_CONFIG;
+            Collection<DescribeConfigsResponse.ConfigEntry> entries = new 
ArrayList<>();
+            topic.configs().forEach((k, v) -> new 
DescribeConfigsResponse.ConfigEntry(
+                    k, v, source, false, false, Collections.emptySet()
+            ));
+            DescribeConfigsResponse.Config config = new 
DescribeConfigsResponse.Config(error, entries);
+            configs.put(resource, config);
+        }
+        return new DescribeConfigsResponse(1000, configs);
+    }
+
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 2e40769..5f6a729 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -183,7 +183,9 @@ public class EmbeddedConnectCluster {
         WorkerHandle toRemove = null;
         for (Iterator<WorkerHandle> it = connectCluster.iterator(); 
it.hasNext(); toRemove = it.next()) {
         }
-        removeWorker(toRemove);
+        if (toRemove != null) {
+            removeWorker(toRemove);
+        }
     }
 
     /**
@@ -212,6 +214,24 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Determine whether the Connect cluster has any workers running.
+     *
+     * @return true if any worker is running, or false otherwise
+     */
+    public boolean anyWorkersRunning() {
+        return workers().stream().anyMatch(WorkerHandle::isRunning);
+    }
+
+    /**
+     * Determine whether the Connect cluster has all workers running.
+     *
+     * @return true if all workers are running, or false otherwise
+     */
+    public boolean allWorkersRunning() {
+        return workers().stream().allMatch(WorkerHandle::isRunning);
+    }
+
     @SuppressWarnings("deprecation")
     public void startConnect() {
         log.info("Starting Connect cluster '{}' with {} workers", 
connectClusterName, numInitialWorkers);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index b6c96f1..9af828e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -174,7 +174,9 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
     private void stop(boolean deleteLogDirs, boolean stopZK) {
         try {
-            producer.close();
+            if (producer != null) {
+                producer.close();
+            }
         } catch (Exception e) {
             log.error("Could not shutdown producer ", e);
             throw new RuntimeException("Could not shutdown producer", e);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index 64b2b12..4d94794 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -58,6 +58,15 @@ public class WorkerHandle {
     }
 
     /**
+     * Determine if this worker is running.
+     *
+     * @return true if the worker is running, or false otherwise
+     */
+    public boolean isRunning() {
+        return worker.isRunning();
+    }
+
+    /**
      * Get the workers's name corresponding to this handle.
      *
      * @return the worker's name

Reply via email to