This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push: new 333bd7c KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153) 333bd7c is described below commit 333bd7ce9fefd22647f7f2729cd528894e3e8bd9 Author: Randall Hauch <rha...@gmail.com> AuthorDate: Fri Feb 19 11:49:56 2021 -0600 KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153) These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped. Author: Randall Hauch <rha...@gmail.com> Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com> --- .../connect/storage/KafkaConfigBackingStore.java | 19 +++++++++++++++++-- .../connect/storage/KafkaOffsetBackingStore.java | 19 +++++++++++++++++-- .../connect/storage/KafkaStatusBackingStore.java | 19 +++++++++++++++++-- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index d4e6358..dcfc28c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,6 +227,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); private final Supplier<TopicAdmin> topicAdminSupplier; + private SharedTopicAdmin ownTopicAdmin; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. @@ -291,7 +293,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @Override public void stop() { log.info("Closing KafkaConfigBackingStore"); - configLog.stop(); + try { + configLog.stop(); + } finally { + if (ownTopicAdmin != null) { + ownTopicAdmin.close(); + } + } log.info("Closed KafkaConfigBackingStore"); } @@ -479,7 +487,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { Map<String, Object> adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); - Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + Supplier<TopicAdmin> adminSupplier; + if (topicAdminSupplier != null) { + adminSupplier = topicAdminSupplier; + } else { + // Create our own topic admin supplier that we'll close when we're stopped + ownTopicAdmin = new SharedTopicAdmin(adminProps); + adminSupplier = ownTopicAdmin; + } Map<String, Object> topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).configStorageTopicSettings() : Collections.emptyMap(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 26b47f9..313baf7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -32,6 +32,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConvertingFutureCallback; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { private KafkaBasedLog<byte[], byte[]> offsetLog; private HashMap<ByteBuffer, ByteBuffer> data; private final Supplier<TopicAdmin> topicAdminSupplier; + private SharedTopicAdmin ownTopicAdmin; @Deprecated public KafkaOffsetBackingStore() { @@ -98,7 +100,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { Map<String, Object> adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); - Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + Supplier<TopicAdmin> adminSupplier; + if (topicAdminSupplier != null) { + adminSupplier = topicAdminSupplier; + } else { + // Create our own topic admin supplier that we'll close when we're stopped + ownTopicAdmin = new SharedTopicAdmin(adminProps); + adminSupplier = ownTopicAdmin; + } Map<String, Object> topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).offsetStorageTopicSettings() : Collections.emptyMap(); @@ -140,7 +149,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @Override public void stop() { log.info("Stopping KafkaOffsetBackingStore"); - offsetLog.stop(); + try { + offsetLog.stop(); + } finally { + if (ownTopicAdmin != null) { + ownTopicAdmin.close(); + } + } log.info("Stopped KafkaOffsetBackingStore"); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index abdbba8..bb73e46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.Table; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; @@ -134,6 +135,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { private String statusTopic; private KafkaBasedLog<String, byte[]> kafkaLog; private int generation; + private SharedTopicAdmin ownTopicAdmin; @Deprecated public KafkaStatusBackingStore(Time time, Converter converter) { @@ -177,7 +179,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore { Map<String, Object> adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); - Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps); + Supplier<TopicAdmin> adminSupplier; + if (topicAdminSupplier != null) { + adminSupplier = topicAdminSupplier; + } else { + // Create our own topic admin supplier that we'll close when we're stopped + ownTopicAdmin = new SharedTopicAdmin(adminProps); + adminSupplier = ownTopicAdmin; + } Map<String, Object> topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).statusStorageTopicSettings() @@ -226,7 +235,13 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @Override public void stop() { - kafkaLog.stop(); + try { + kafkaLog.stop(); + } finally { + if (ownTopicAdmin != null) { + ownTopicAdmin.close(); + } + } } @Override