This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 03077678e6e46495fa1d51ea7cb553778de6e9fc
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Tue Feb 9 11:09:41 2021 -0600

    KAFKA-10021: Changed Kafka backing stores to use shared admin client to get 
end offsets and create topics (#9780)
    
    The existing `Kafka*BackingStore` classes used by Connect all use 
`KafkaBasedLog`, which needs to frequently get the end offsets for the internal 
topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to 
get the end offsets and to consume the records from the topic.
    
    However, the Connect internal topics are often written very infrequently. 
This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` 
classes is already caught up and its last consumer poll is waiting for new 
records to appear, the call to the consumer to fetch end offsets will block 
until the consumer returns after a new record is written (unlikely) or the 
consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the 
consumer returns no more records. IOW, the  [...]
    
    Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly 
when the log is already caught up. The best way to do this is to have the 
`KafkaBackingStore` use the admin client (rather than the consumer) to fetch 
end offsets for the internal topic. The consumer and the admin API both use the 
same `ListOffset` broker API, so the functionality is ultimately the same but 
we don't have to block for any ongoing consumer activity.
    
    Each Connect distributed runtime includes three instances of the 
`Kafka*BackingStore` classes, which means we have three instances of 
`KafkaBasedLog`. We don't want three instances of the admin client, and should 
have all three instances of the `KafkaBasedLog` share a single admin client 
instance. In fact, each `Kafka*BackingStore` instance currently creates, uses 
and closes an admin client instance when it checks and initializes that store's 
internal topic. If we change `Kafka*Backin [...]
    
    The final challenge is that `KafkaBasedLog` has been used by projects 
outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public 
API for Connect, we can make these changes in ways that are backward 
compatible: create new constructors and deprecate the old constructors. Connect 
can be changed to only use the new constructors, and this will give time for 
any downstream users to make changes.
    
    These changes are implemented as follows:
    1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier 
from which it can get an admin instance, and deprecate the old constructor. We 
need a supplier rather than just passing an instance because `KafkaBasedLog` is 
instantiated before Connect starts up, so we need to create the admin instance 
only when needed. At the same time, we'll change the existing init function 
parameter from a no-arg function to accept an admin instance as an argument, 
allowing that init fun [...]
    2. Add to the `Kafka*BackingStore` classes a new constructor with the same 
parameters but with an admin supplier, and deprecate the old constructor. When 
the classes instantiate its `KafkaBasedLog` instance, it would pass the admin 
supplier and pass an init function that takes an admin instance.
    3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` 
(and underlying Admin client) when required, and closes the admin objects when 
the `SharedTopicAdmin` is closed.
    4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate 
the logic of fetching end offsets using the admin client, simplifying the logic 
in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to 
test that logic.
    5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance 
(that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, 
passing the `SharedTopicAdmin` (which is an admin supplier) to all three 
`Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` 
upon termination. (Shutdown of the worker occurs outside of the 
`ConnectDistributed` code, so modify `DistributedHerder` to take in its 
constructor additional `AutoCloseable` objects that [...]
    6. Change `MirrorMaker` similarly to `ConnectDistributed`.
    7. Change existing unit tests to no longer use deprecated constructors.
    8. Add unit tests for new functionality.
    
    Author: Randall Hauch <rha...@gmail.com>
    Reviewer: Konstantine Karantasis <konstant...@confluent.io>
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   |  17 +-
 .../kafka/connect/cli/ConnectDistributed.java      |  18 +-
 .../runtime/distributed/DistributedHerder.java     |  37 ++-
 .../connect/storage/KafkaConfigBackingStore.java   |  33 ++-
 .../connect/storage/KafkaOffsetBackingStore.java   |  37 ++-
 .../connect/storage/KafkaStatusBackingStore.java   |  33 ++-
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  73 +++++-
 .../kafka/connect/util/SharedTopicAdmin.java       | 145 +++++++++++
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  67 +++++
 .../runtime/distributed/DistributedHerderTest.java |  13 +-
 .../storage/KafkaConfigBackingStoreTest.java       |   6 +-
 .../storage/KafkaOffsetBackingStoreTest.java       |   6 +-
 .../kafka/connect/util/SharedTopicAdminTest.java   | 112 +++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  | 275 ++++++++++++++++++++-
 14 files changed, 808 insertions(+), 64 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index a975f31..e13873e 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.util.ConnectUtils;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,20 +234,28 @@ public class MirrorMaker {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        // Create the admin client to be shared by all backing stores for this 
herder
+        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        ConnectUtils.addMetricsContextProperties(adminProps, 
distributedConfig, kafkaClusterId);
+        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin);
         offsetBackingStore.configure(distributedConfig);
         Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer);
+                configTransformer,
+                sharedAdmin);
+        // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
+        // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
+        // tracking the various shared admin objects in this class.
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
+                advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 22c1ad8..8d93e79 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -36,12 +36,14 @@ import 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -101,7 +103,12 @@ public class ConnectDistributed {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+        // Create the admin client to be shared by all backing stores.
+        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
+        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
+
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
@@ -112,17 +119,20 @@ public class ConnectDistributed {
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer);
+                configTransformer,
+                sharedAdmin);
 
+        // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
+        // herder is stopped. This is easier than having to track and own the 
lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy);
+                advertisedUrl.toString(), connectorClientConfigOverridePolicy, 
sharedAdmin);
 
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect distributed worker initialization took {}ms", 
time.hiResClockMs() - initStart);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7c6e8a6..16dfbf9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -66,6 +67,7 @@ import javax.crypto.KeyGenerator;
 import javax.crypto.SecretKey;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -138,6 +140,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private final Time time;
     private final HerderMetrics herderMetrics;
+    private final List<AutoCloseable> uponShutdown;
 
     private final String workerGroupId;
     private final int workerSyncTimeoutMs;
@@ -185,6 +188,22 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be 
null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not 
be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use 
for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be 
null
+     * @param configBackingStore the backing store for connector 
configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be 
null
+     * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+     *                                            in connector configurations; 
may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by 
this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
@@ -192,9 +211,10 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {
         this(config, worker, worker.workerId(), kafkaClusterId, 
statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
-             time, connectorClientConfigOverridePolicy);
+             time, connectorClientConfigOverridePolicy, uponShutdown);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -209,7 +229,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time,
-                      ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+                      ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                      AutoCloseable... uponShutdown) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, 
configBackingStore, connectorClientConfigOverridePolicy);
 
         this.time = time;
@@ -223,6 +244,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         this.keySignatureVerificationAlgorithms = 
config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
         this.keyGenerator = config.getInternalRequestKeyGenerator();
         this.isTopicTrackingEnabled = 
config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.uponShutdown = Arrays.asList(uponShutdown);
 
         String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
         String clientId = clientIdConfig.length() <= 0 ? "connect-" + 
CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@@ -677,6 +699,15 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     }
 
     @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.forEach(closeable -> 
Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : 
"<unknown>"));
+        }
+    }
+
+    @Override
     public void stop() {
         log.info("Herder stopping");
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 685c3e0..d4e6358 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,6 +62,7 @@ import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 /**
  * <p>
@@ -224,6 +225,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     // Connector and task configs: name or id -> config map
     private final Map<String, Map<String, String>> connectorConfigs = new 
HashMap<>();
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new 
HashMap<>();
+    private final Supplier<TopicAdmin> topicAdminSupplier;
 
     // Set of connectors where we saw a task commit with an incomplete set of 
task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they 
have been refreshed.
@@ -241,11 +243,17 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
     private final WorkerConfigTransformer configTransformer;
 
+    @Deprecated
     public KafkaConfigBackingStore(Converter converter, WorkerConfig config, 
WorkerConfigTransformer configTransformer) {
+        this(converter, config, configTransformer, null);
+    }
+
+    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, 
WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
         this.offset = -1;
+        this.topicAdminSupplier = adminSupplier;
 
         this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
         if (this.topic == null || this.topic.trim().length() == 0)
@@ -471,6 +479,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, 
clusterId);
+        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? 
topicAdminSupplier : () -> new TopicAdmin(adminProps);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) 
config).configStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -481,27 +490,25 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                 
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
                 .build();
 
-        return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminProps);
+        return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
                                                               Map<String, 
Object> consumerProps,
                                                               
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
-                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
-        Runnable createTopics = () -> {
+                                                              final NewTopic 
topicDescription, Supplier<TopicAdmin> adminSupplier) {
+        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
             log.debug("Creating admin client to manage Connect internal config 
topic");
-            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                // Create the topic if it doesn't exist
-                Set<String> newTopics = admin.createTopics(topicDescription);
-                if (!newTopics.contains(topic)) {
-                    // It already existed, so check that the topic cleanup 
policy is compact only and not delete
-                    log.debug("Using admin client to check cleanup policy of 
'{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                    admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                            DistributedConfig.CONFIG_TOPIC_CONFIG, "connector 
configurations");
-                }
+            // Create the topic if it doesn't exist
+            Set<String> newTopics = admin.createTopics(topicDescription);
+            if (!newTopics.contains(topic)) {
+                // It already existed, so check that the topic cleanup policy 
is compact only and not delete
+                log.debug("Using admin client to check cleanup policy of '{}' 
topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                        DistributedConfig.CONFIG_TOPIC_CONFIG, "connector 
configurations");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index a7e7c3b..26b47f9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -41,11 +41,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 /**
  * <p>
@@ -62,6 +64,16 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
 
     private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
+    private final Supplier<TopicAdmin> topicAdminSupplier;
+
+    @Deprecated
+    public KafkaOffsetBackingStore() {
+        this.topicAdminSupplier = null;
+    }
+
+    public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
+        this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
+    }
 
     @Override
     public void configure(final WorkerConfig config) {
@@ -86,6 +98,7 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, 
clusterId);
+        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? 
topicAdminSupplier : () -> new TopicAdmin(adminProps);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) 
config).offsetStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -96,27 +109,25 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
                 
.replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
                 .build();
 
-        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback, topicDescription, adminProps);
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback, topicDescription, adminSupplier);
     }
 
     private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
                                                               Map<String, 
Object> consumerProps,
                                                               
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
-                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
-        Runnable createTopics = () -> {
+                                                              final NewTopic 
topicDescription, Supplier<TopicAdmin> adminSupplier) {
+        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
             log.debug("Creating admin client to manage Connect internal offset 
topic");
-            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                // Create the topic if it doesn't exist
-                Set<String> newTopics = admin.createTopics(topicDescription);
-                if (!newTopics.contains(topic)) {
-                    // It already existed, so check that the topic cleanup 
policy is compact only and not delete
-                    log.debug("Using admin client to check cleanup policy for 
'{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                    admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                            DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"source connector offsets");
-                }
+            // Create the topic if it doesn't exist
+            Set<String> newTopics = admin.createTopics(topicDescription);
+            if (!newTopics.contains(topic)) {
+                // It already existed, so check that the topic cleanup policy 
is compact only and not delete
+                log.debug("Using admin client to check cleanup policy for '{}' 
topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                        DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source 
connector offsets");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index b1c6a2f..efa405f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -61,6 +61,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 
 /**
  * StatusBackingStore implementation which uses a compacted topic for storage
@@ -128,17 +129,24 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
     protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
     protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
     protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> 
topics;
+    private final Supplier<TopicAdmin> topicAdminSupplier;
 
     private String statusTopic;
     private KafkaBasedLog<String, byte[]> kafkaLog;
     private int generation;
 
+    @Deprecated
     public KafkaStatusBackingStore(Time time, Converter converter) {
+        this(time, converter, null);
+    }
+
+    public KafkaStatusBackingStore(Time time, Converter converter, 
Supplier<TopicAdmin> topicAdminSupplier) {
         this.time = time;
         this.converter = converter;
         this.tasks = new Table<>();
         this.connectors = new HashMap<>();
         this.topics = new ConcurrentHashMap<>();
+        this.topicAdminSupplier = topicAdminSupplier;
     }
 
     // visible for testing
@@ -169,6 +177,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, 
clusterId);
+        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? 
topicAdminSupplier : () -> new TopicAdmin(adminProps);
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) 
config).statusStorageTopicSettings()
@@ -181,27 +190,25 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
                 .build();
 
         Callback<ConsumerRecord<String, byte[]>> readCallback = (error, 
record) -> read(record);
-        this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, 
consumerProps, readCallback, topicDescription, adminProps);
+        this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, 
consumerProps, readCallback, topicDescription, adminSupplier);
     }
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
                                                               Map<String, 
Object> consumerProps,
                                                               
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
-                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
-        Runnable createTopics = () -> {
+                                                              final NewTopic 
topicDescription, Supplier<TopicAdmin> adminSupplier) {
+        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
             log.debug("Creating admin client to manage Connect internal status 
topic");
-            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                // Create the topic if it doesn't exist
-                Set<String> newTopics = admin.createTopics(topicDescription);
-                if (!newTopics.contains(topic)) {
-                    // It already existed, so check that the topic cleanup 
policy is compact only and not delete
-                    log.debug("Using admin client to check cleanup policy of 
'{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                    admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                            DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"connector and task statuses");
-                }
+            // Create the topic if it doesn't exist
+            Set<String> newTopics = admin.createTopics(topicDescription);
+            if (!newTopics.contains(topic)) {
+                // It already existed, so check that the topic cleanup policy 
is compact only and not delete
+                log.debug("Using admin client to check cleanup policy of '{}' 
topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                        DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"connector and task statuses");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, time, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, time, createTopics);
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 49c6cf2..6a2a787 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -41,10 +41,12 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 
 /**
@@ -79,13 +81,15 @@ public class KafkaBasedLog<K, V> {
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
+    private final Supplier<TopicAdmin> topicAdminSupplier;
     private Consumer<K, V> consumer;
     private Producer<K, V> producer;
+    private TopicAdmin admin;
 
     private Thread thread;
     private boolean stopRequested;
     private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-    private Runnable initializer;
+    private java.util.function.Consumer<TopicAdmin> initializer;
 
     /**
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
@@ -103,27 +107,63 @@ public class KafkaBasedLog<K, V> {
      * @param consumedCallback callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
      * @param time Time interface
      * @param initializer the component that should be run when this log is 
{@link #start() started}; may be null
+     * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, 
Supplier, Callback, Time, java.util.function.Consumer)}
      */
+    @Deprecated
     public KafkaBasedLog(String topic,
                          Map<String, Object> producerConfigs,
                          Map<String, Object> consumerConfigs,
                          Callback<ConsumerRecord<K, V>> consumedCallback,
                          Time time,
                          Runnable initializer) {
+        this(topic, producerConfigs, consumerConfigs, () -> null, 
consumedCallback, time, initializer != null ? admin -> initializer.run() : 
null);
+    }
+
+    /**
+     * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
+     * {@link #start()} is invoked.
+     *
+     * @param topic              the topic to treat as a log
+     * @param producerConfigs    configuration options to use when creating 
the internal producer. At a minimum this must
+     *                           contain compatible serializer settings for 
the generic types used on this class. Some
+     *                           setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
+     *                           class.
+     * @param consumerConfigs    configuration options to use when creating 
the internal consumer. At a minimum this must
+     *                           contain compatible serializer settings for 
the generic types used on this class. Some
+     *                           setting, such as the auto offset reset 
policy, will be overridden to ensure correct
+     *                           behavior of this class.
+     * @param topicAdminSupplier supplier function for an admin client, the 
lifecycle of which is expected to be controlled
+     *                           by the calling component; may not be null
+     * @param consumedCallback   callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time               Time interface
+     * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
+     */
+    public KafkaBasedLog(String topic,
+            Map<String, Object> producerConfigs,
+            Map<String, Object> consumerConfigs,
+            Supplier<TopicAdmin> topicAdminSupplier,
+            Callback<ConsumerRecord<K, V>> consumedCallback,
+            Time time,
+            java.util.function.Consumer<TopicAdmin> initializer) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
+        this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier);
         this.consumedCallback = consumedCallback;
         this.stopRequested = false;
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
-        this.initializer = initializer != null ? initializer : () -> { };
+        this.initializer = initializer != null ? initializer : admin -> { };
     }
 
     public void start() {
         log.info("Starting KafkaBasedLog with topic " + topic);
 
-        initializer.run();
+        // Create the topic admin client and initialize the topic ...
+        admin = topicAdminSupplier.get();   // may be null
+        initializer.accept(admin);
+
+        // Then create the producer and consumer
         producer = createProducer();
         consumer = createConsumer();
 
@@ -189,6 +229,9 @@ public class KafkaBasedLog<K, V> {
             log.error("Failed to stop KafkaBasedLog consumer", e);
         }
 
+        // do not close the admin client, since we don't own it
+        admin = null;
+
         log.info("Stopped KafkaBasedLog for topic " + topic);
     }
 
@@ -278,7 +321,29 @@ public class KafkaBasedLog<K, V> {
         log.trace("Reading to end of offset log");
 
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
+        Map<TopicPartition, Long> endOffsets;
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available)
+        // (which prevents 'consumer.endOffsets(...)'
+        // from
+
+        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+        if (admin != null) {
+            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+            // Unlike using the consumer
+            endOffsets = admin.endOffsets(assignment);
+        } else {
+            // The admin may be null if older deprecated constructor is used, 
though AK Connect currently always provides an admin client.
+            // Using the consumer is not ideal, because when the topic has low 
volume, the 'poll(...)' method called from the
+            // work thread may have blocked the consumer while waiting for 
more records (even when there are none).
+            // In such cases, this call to the consumer to simply find the end 
offsets will block even though we might already be
+            // at the end offset.
+            endOffsets = consumer.endOffsets(assignment);
+        }
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
new file mode 100644
index 0000000..a99514e
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, 
Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);
+    }
+
+    /**
+     * Get the string containing the list of bootstrap server addresses to the 
Kafka broker(s) to which
+     * the admin client connects.
+     *
+     * @return the bootstrap servers as a string; never null
+     */
+    public String bootstrapServers() {
+        return 
adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"<unknown>").toString();
+    }
+
+    /**
+     * Close the underlying {@link TopicAdmin} instance, if one has been 
created, and prevent new ones from being created.
+     *
+     * <p>Once this method is called, the {@link #get()} and {@link 
#topicAdmin()} methods,
+     * nor any previously returned {@link TopicAdmin} instances may be used.
+     */
+    @Override
+    public void close() {
+        close(DEFAULT_CLOSE_DURATION);
+    }
+
+    /**
+     * Close the underlying {@link TopicAdmin} instance, if one has been 
created, and prevent new ones from being created.
+     *
+     * <p>Once this method is called, the {@link #get()} and {@link 
#topicAdmin()} methods,
+     * nor any previously returned {@link TopicAdmin} instances may be used.
+     *
+     * @param timeout the maximum time to wait while the underlying admin 
client is closed; may not be null
+     */
+    public void close(Duration timeout) {
+        Objects.requireNonNull(timeout);
+        if (this.closed.compareAndSet(false, true)) {
+            TopicAdmin admin = this.admin.getAndSet(null);
+            if (admin != null) {
+                admin.close(timeout);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "admin client for brokers at " + bootstrapServers();
+    }
+
+    /**
+     * Method used to create a {@link TopicAdmin} instance. This method must 
be side-effect free, since it is called from within
+     * the {@link AtomicReference#updateAndGet(UnaryOperator)}.
+     *
+     * @param existing the existing instance; may be null
+     * @return the
+     */
+    protected TopicAdmin createAdmin(TopicAdmin existing) {
+        if (closed.get()) {
+            throw new ConnectException("The " + this + " has already been 
closed and cannot be used.");
+        }
+        if (existing != null) {
+            return existing;
+        }
+        return factory.apply(adminProps);
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 7b208e4..9a7907b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -23,14 +23,20 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
@@ -53,6 +59,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -283,6 +290,14 @@ public class TopicAdmin implements AutoCloseable {
     }
 
     /**
+     * Get the {@link Admin} client used by this topic admin object.
+     * @return the Kafka admin instance; never null
+     */
+    public Admin admin() {
+        return admin;
+    }
+
+   /**
      * Attempt to create the topic described by the given definition, 
returning true if the topic was created or false
      * if the topic already existed.
      *
@@ -630,6 +645,58 @@ public class TopicAdmin implements AutoCloseable {
         return result;
     }
 
+    /**
+     * Fetch the most recent offset for each of the supplied {@link 
TopicPartition} objects.
+     *
+     * @param partitions the topic partitions
+     * @return the map of offset for each topic partition, or an empty map if 
the supplied partitions
+     *         are null or empty
+     * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> 
partitions) {
+        if (partitions == null || partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        Map<TopicPartition, OffsetSpec> offsetSpecMap = 
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest()));
+        ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap);
+        // Get the individual result for each topic partition so we have 
better error messages
+        Map<TopicPartition, Long> result = new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            try {
+                ListOffsetsResultInfo info = 
resultFuture.partitionResult(partition).get();
+                result.put(partition, info.offset());
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                String topic = partition.topic();
+                if (cause instanceof AuthorizationException) {
+                    String msg = String.format("Not authorized to get the end 
offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
+                    throw new ConnectException(msg, e);
+                } else if (cause instanceof UnsupportedVersionException) {
+                    // Should theoretically never happen, because this method 
is the same as what the consumer uses and therefore
+                    // should exist in the broker since before the admin 
client was added
+                    String msg = String.format("API to get the get the end 
offsets for topic '%s' is unsupported on brokers at %s", topic, 
bootstrapServers());
+                    throw new ConnectException(msg, e);
+                } else if (cause instanceof TimeoutException) {
+                    String msg = String.format("Timed out while waiting to get 
end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
+                    throw new RetriableException(msg, e);
+                } else if (cause instanceof LeaderNotAvailableException) {
+                    String msg = String.format("Unable to get end offsets 
during leader election for topic '%s' on brokers at %s", topic, 
bootstrapServers());
+                    throw new RetriableException(msg, e);
+                } else {
+                    String msg = String.format("Error while getting end 
offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
+                    throw new ConnectException(msg, e);
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                String msg = String.format("Interrupted while attempting to 
read end offsets for topic '%s' on brokers at %s", partition.topic(), 
bootstrapServers());
+                throw new RetriableException(msg, e);
+            }
+        }
+        return result;
+    }
+
     @Override
     public void close() {
         admin.close();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 5bddcf7..e31a03f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -75,6 +75,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -189,6 +190,7 @@ public class DistributedHerderTest {
     @Mock private Plugins plugins;
     @Mock private PluginClassLoader pluginLoader;
     @Mock private DelegatingClassLoader delegatingLoader;
+    private CountDownLatch shutdownCalled = new CountDownLatch(1);
 
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
@@ -206,6 +208,7 @@ public class DistributedHerderTest {
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
+        AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
 
         // Default to the old protocol unless specified otherwise
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
@@ -213,7 +216,8 @@ public class DistributedHerderTest {
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"connectorTypeForClass", 
"updateDeletedConnectorStatus", "updateDeletedTaskStatus", 
"validateConnectorConfig"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, 
metrics, time, noneConnectorClientConfigOverridePolicy);
+                statusBackingStore, configBackingStore, member, MEMBER_URL, 
metrics, time, noneConnectorClientConfigOverridePolicy,
+                new AutoCloseable[]{uponShutdown});
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener(time);
@@ -2211,6 +2215,13 @@ public class DistributedHerderTest {
                 
getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
     }
 
+    @Test
+    public void testHerderStopServicesClosesUponShutdown() {
+        assertEquals(1, shutdownCalled.getCount());
+        herder.stopServices();
+        assertEquals(0, shutdownCalled.getCount());
+    }
+
     private void expectRebalance(final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index eca291e..4504d39 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TestFuture;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -55,6 +56,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -140,7 +142,7 @@ public class KafkaConfigBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedAdminProps = 
EasyMock.newCapture();
+    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = 
EasyMock.newCapture();
     private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<String, byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
 
@@ -893,7 +895,7 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), 
EasyMock.capture(capturedProducerProps),
                 EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback),
-                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminProps))
+                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminSupplier))
                 .andReturn(storeLog);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 0d7c133..d216068 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -49,6 +50,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -105,7 +107,7 @@ public class KafkaOffsetBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedAdminProps = 
EasyMock.newCapture();
+    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = 
EasyMock.newCapture();
     private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<byte[], byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
 
@@ -370,7 +372,7 @@ public class KafkaOffsetBackingStoreTest {
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(store, "createKafkaBasedLog", 
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
                 EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback),
-                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminProps))
+                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminSupplier))
                 .andReturn(storeLog);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
new file mode 100644
index 0000000..f5ac6a7
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Rule;
+import org.mockito.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static 
org.apache.kafka.connect.util.SharedTopicAdmin.DEFAULT_CLOSE_DURATION;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> EMPTY_CONFIG = 
Collections.emptyMap();
+
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    @Mock private Function<Map<String, Object>, TopicAdmin> factory;
+    private SharedTopicAdmin sharedAdmin;
+
+    @Before
+    public void beforeEach() {
+        when(factory.apply(anyMap())).thenReturn(mockTopicAdmin);
+        sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply);
+    }
+
+    @Test
+    public void shouldCloseWithoutBeingUsed() {
+        // When closed before being used
+        sharedAdmin.close();
+        // Then should not create or close admin
+        verifyTopicAdminCreatesAndCloses(0);
+    }
+
+    @Test
+    public void shouldCloseAfterTopicAdminUsed() {
+        // When used and then closed
+        assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
+        sharedAdmin.close();
+        // Then should have created and closed just one admin
+        verifyTopicAdminCreatesAndCloses(1);
+    }
+
+    @Test
+    public void shouldCloseAfterTopicAdminUsedMultipleTimes() {
+        // When used many times and then closed
+        for (int i = 0; i != 10; ++i) {
+            assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
+        }
+        sharedAdmin.close();
+        // Then should have created and closed just one admin
+        verifyTopicAdminCreatesAndCloses(1);
+    }
+
+    @Test
+    public void shouldCloseWithDurationAfterTopicAdminUsed() {
+        // When used and then closed with a custom timeout
+        Duration timeout = Duration.ofSeconds(1);
+        assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
+        sharedAdmin.close(timeout);
+        // Then should have created and closed just one admin using the 
supplied timeout
+        verifyTopicAdminCreatesAndCloses(1, timeout);
+    }
+
+    @Test
+    public void shouldFailToGetTopicAdminAfterClose() {
+        // When closed
+        sharedAdmin.close();
+        // Then using the admin should fail
+        assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin());
+    }
+
+    private void verifyTopicAdminCreatesAndCloses(int count) {
+        verifyTopicAdminCreatesAndCloses(count, DEFAULT_CLOSE_DURATION);
+    }
+
+    private void verifyTopicAdminCreatesAndCloses(int count, Duration 
expectedDuration) {
+        verify(factory, times(count)).apply(anyMap());
+        verify(mockTopicAdmin, times(count)).close(eq(expectedDuration));
+    }
+}
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 646da71..9ba0b1d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
@@ -36,19 +38,26 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DescribeConfigsResponseData;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -456,17 +465,273 @@ public class TopicAdminTest {
         }
     }
 
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () 
-> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end 
offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = 
admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, 
tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int 
partitions) {
+        Node[] nodeArray = new Node[numNodes];
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {
-            nodes.put(i, new Node(i, "localhost", 8121 + i));
-        }
-        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
-                Collections.emptySet(), Collections.emptySet(),
-                Collections.emptySet(), nodes.get(0));
+            nodeArray[i] = new Node(i, "localhost", 8121 + i);
+            nodes.put(i, nodeArray[i]);
+        }
+        Node leader = nodeArray[0];
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        for (int i = 0; i < partitions; ++i) {
+            pInfos.add(new PartitionInfo(topicName, i, leader, nodeArray, 
nodeArray));
+        }
+        Cluster cluster = new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                pInfos,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                leader);
         return cluster;
     }
 
+    private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors 
error) {
+        List<MetadataResponseTopic> metadata = new ArrayList<>();
+        for (String topic : cluster.topics()) {
+            List<MetadataResponseData.MetadataResponsePartition> pms = new 
ArrayList<>();
+            for (PartitionInfo pInfo : 
cluster.availablePartitionsForTopic(topic)) {
+                MetadataResponseData.MetadataResponsePartition pm  = new 
MetadataResponseData.MetadataResponsePartition()
+                        .setErrorCode(error.code())
+                        .setPartitionIndex(pInfo.partition())
+                        .setLeaderId(pInfo.leader().id())
+                        .setLeaderEpoch(234)
+                        
.setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()))
+                        
.setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()))
+                        
.setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
+                pms.add(pm);
+            }
+            MetadataResponseTopic tm = new MetadataResponseTopic()
+                    .setErrorCode(error.code())
+                    .setName(topic)
+                    .setIsInternal(false)
+                    .setPartitions(pms);
+            metadata.add(tm);
+        }
+        return MetadataResponse.prepareResponse(true,
+                0,
+                cluster.nodes(),
+                cluster.clusterResource().clusterId(),
+                cluster.controller().id(),
+                metadata,
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+    }
+
+    private ListOffsetsResponse 
listOffsetsResultWithUnknownError(TopicPartition tp1, Long offset1) {
+        return listOffsetsResult(
+                new ApiError(Errors.UNKNOWN_SERVER_ERROR, "Unknown error"),
+                Collections.singletonMap(tp1, offset1)
+        );
+    }
+
+    private ListOffsetsResponse listOffsetsResultWithTimeout(TopicPartition 
tp1, Long offset1) {
+        return listOffsetsResult(
+                new ApiError(Errors.REQUEST_TIMED_OUT, "Request timed out"),
+                Collections.singletonMap(tp1, offset1)
+        );
+    }
+
+    private ListOffsetsResponse 
listOffsetsResultWithUnsupportedVersion(TopicPartition tp1, Long offset1) {
+        return listOffsetsResult(
+                new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the 
API is not supported"),
+                Collections.singletonMap(tp1, offset1)
+        );
+    }
+
+    private ListOffsetsResponse 
listOffsetsResultWithClusterAuthorizationException(TopicPartition tp1, Long 
offset1) {
+        return listOffsetsResult(
+                new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not 
authorized to create topic(s)"),
+                Collections.singletonMap(tp1, offset1)
+        );
+    }
+
+    private ListOffsetsResponse listOffsetsResult(TopicPartition tp1, Long 
offset1) {
+        return listOffsetsResult(null, Collections.singletonMap(tp1, offset1));
+    }
+
+    private ListOffsetsResponse listOffsetsResult(TopicPartition tp1, Long 
offset1, TopicPartition tp2, Long offset2) {
+        Map<TopicPartition, Long> offsetsByPartitions = new HashMap<>();
+        offsetsByPartitions.put(tp1, offset1);
+        offsetsByPartitions.put(tp2, offset2);
+        return listOffsetsResult(null, offsetsByPartitions);
+    }
+
+    /**
+     * Create a ListOffsetResponse that exposes the supplied error and 
includes offsets for the supplied partitions.
+     * @param error               the error; may be null if an unknown error 
should be used
+     * @param offsetsByPartitions offset for each partition, where offset is 
null signals the error should be used
+     * @return the response
+     */
+    private ListOffsetsResponse listOffsetsResult(ApiError error, 
Map<TopicPartition, Long> offsetsByPartitions) {
+        if (error == null) error = new 
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "unknown topic");
+        List<ListOffsetsTopicResponse> tpResponses = new ArrayList<>();
+        for (TopicPartition partition : offsetsByPartitions.keySet()) {
+            Long offset = offsetsByPartitions.get(partition);
+            ListOffsetsTopicResponse topicResponse;
+            if (offset == null) {
+                topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(partition, error.error(), 
-1L, 0, 321);
+            } else {
+                topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(partition, Errors.NONE, 
-1L, offset, 321);
+            }
+            tpResponses.add(topicResponse);
+        }
+        ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(tpResponses);
+
+        return new ListOffsetsResponse(responseData);
+    }
+
     private CreateTopicsResponse 
createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
         return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, 
"This version of the API is not supported"), topics);
     }

Reply via email to