kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570584036



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time,
-                      ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+                      ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                      AutoCloseable... uponShutdown) {

Review comment:
       see comment above

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be 
null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not 
be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use 
for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be 
null
+     * @param configBackingStore the backing store for connector 
configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be 
null
+     * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+     *                                            in connector configurations; 
may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by 
this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       I think it's better to avoid a variadic argument here. 
   Parameters tend to get added with new features in such constructors. And if 
a new parameter is required that is also a list, then we'll have a mix of list 
args with a variadic in the end. 
   
   Since we transform to list I'd suggest using this type here and pass the 
single argument with `Collections.singletonList` in the caller. 

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();

Review comment:
       should we call it `EMPTY_CONFIG` since it won't change?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, 
Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       It's not immediately obvious to me what's the advantage compared to a 
synchronized `topicAdmin` or even `get`. 
   
   I see that the value can go back to null. But we already have the guard with 
the `closed` to atomically decide whether this is closed or not. Lmk if I'm 
missing something, but I feel the points of indirection might be a few more 
than they have to be here (we have no gains in locking, since we'll need to 
atomically `updateAndGet` in every get and the advantage vs `synchronized` 
should be negligible here). 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> 
Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : 
"<unknown>"));
+        }
+

Review comment:
       nit: extra

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> 
Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : 
"<unknown>"));

Review comment:
       I'd guess `stream().forEach` can be simplified with `forEach` only

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -207,14 +209,16 @@ public void setUp() throws Exception {
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
+        AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
 
         // Default to the old protocol unless specified otherwise
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"connectorTypeForClass", 
"updateDeletedConnectorStatus", "updateDeletedTaskStatus", 
"validateConnectorConfig"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, 
metrics, time, noneConnectorClientConfigOverridePolicy);
+                statusBackingStore, configBackingStore, member, MEMBER_URL, 
metrics, time, noneConnectorClientConfigOverridePolicy,
+                new AutoCloseable[]{uponShutdown});

Review comment:
       see comment above. We'll avoid the array static init too. Tests here but 
still that's where array lists come handy

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () 
-> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end 
offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = 
admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, 
tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int 
partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       curious, what is the array symbolizing here now?
   We used to have 1 value. Is this ISR nodes? Do we even need to add or remove 
any? 

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Is there a specific action on the mock we wish or can verify here 
instead of implicitly using a aux variable for that?
   Replay, expectation and verify should help us verify the action or its 
absence. I'd have to check closer what such action could be, if there's any. 
Maybe you can see that more easily. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be 
null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not 
be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use 
for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be 
null
+     * @param configBackingStore the backing store for connector 
configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be 
null
+     * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+     *                                            in connector configurations; 
may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by 
this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       We can always keep a constructor with the old signature along with the 
new if we wanted not to break classes that use `DistributedHerder`. I'm fine 
with the change here as a short term workaround. I guess it saves us one 
constructor but we can use it only once.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, 
Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how 
to apply `updateAndGet`. 
   
   I just didn't feel that the three or so levels of indirection were worth to 
write the singleton pattern differently.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, 
Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how 
to apply `updateAndGet`. 
   
   I just didn't feel that the two or three levels of indirection were worth to 
write the singleton pattern differently.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       I know. It's just that we already use a mocking framework and we could 
use something like: 
   
   
`EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();`
   
   if we also defined `factory` to be a mock as well. That could allow us to 
evaluate expectations on the mock more accurately (e.g. with a capture if we 
had to). But sure, if we need something quick and easy we can go with that. 
It's just that I noticed a mixed use of mocks with this variable that simulates 
what the mocking framework offers already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to