[ 
https://issues.apache.org/jira/browse/KAFKA-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690153#comment-16690153
 ] 

ASF GitHub Bot commented on KAFKA-7402:
---------------------------------------

cmccabe closed pull request #5839: KAFKA-7402: Kafka Streams should implement 
AutoCloseable where approp…
URL: https://github.com/apache/kafka/pull/5839
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 763fe512217..6af47058e4e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -40,7 +40,7 @@
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface ConsumerInterceptor<K, V> extends Configurable {
+public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable 
{
 
     /**
      * This is called just before the records are returned by
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d9830877ba7..fa8bab53e72 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -932,7 +932,7 @@ public double measure(MetricConfig config, long now) {
         }
     }
 
-    private class HeartbeatThread extends KafkaThread {
+    private class HeartbeatThread extends KafkaThread implements AutoCloseable 
{
         private boolean enabled = false;
         private boolean closed = false;
         private AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 995bdaa6dce..55d6b25a411 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -25,7 +25,7 @@
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Configurable {
+public interface MetricsReporter extends Configurable, AutoCloseable {
 
     /**
      * This is called when the reporter is first registered to initially 
register all existing metrics
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b137512ce..3bca276bc73 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -63,7 +63,7 @@
  * to memory pressure or other reasons</li>
  * </ul>
  */
-public class KafkaChannel {
+public class KafkaChannel implements AutoCloseable {
     private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 
1000 * 1000;
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b960fcbd942..843d46dc736 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -983,7 +983,7 @@ public int numStagedReceives(KafkaChannel channel) {
         return deque == null ? 0 : deque.size();
     }
 
-    private class SelectorMetrics {
+    private class SelectorMetrics implements AutoCloseable {
         private final Metrics metrics;
         private final String metricGrpPrefix;
         private final Map<String, String> metricTags;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 0cc2cec31f9..7512c8273da 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,7 +37,7 @@
  * and the builder is closed (e.g. the Producer), it's important to call 
`closeForRecordAppends` when the former happens.
  * This will release resources like compression buffers that can be relatively 
large (64 KB for LZ4).
  */
-public class MemoryRecordsBuilder {
+public class MemoryRecordsBuilder implements AutoCloseable {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
     private static final DataOutputStream CLOSED_STREAM = new 
DataOutputStream(new OutputStream() {
         @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index 546b158d978..f6bb26464af 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -37,7 +37,7 @@
  * server when the login is a type that has a limited lifetime/will expire. The
  * credentials for the login must implement {@link ExpiringCredential}.
  */
-public abstract class ExpiringCredentialRefreshingLogin {
+public abstract class ExpiringCredentialRefreshingLogin implements 
AutoCloseable {
     /**
      * Class that can be overridden for testing
      */
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index d1e97b2db63..cf82f867baf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -232,7 +232,7 @@ public String toString() {
      * the {@link Metrics} class, so that the sensor names are made to be 
unique (based on the group name)
      * and so the sensors are removed when this group is {@link #close() 
closed}.
      */
-    public class MetricGroup {
+    public class MetricGroup implements AutoCloseable {
         private final MetricGroupId groupId;
         private final Set<String> sensorNames = new HashSet<>();
         private final String sensorPrefix;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 55d4860b2e6..93d7e72449f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -231,7 +231,7 @@ public String toString() {
                        '}';
     }
 
-    class ConnectorMetricsGroup implements ConnectorStatus.Listener {
+    class ConnectorMetricsGroup implements ConnectorStatus.Listener, 
AutoCloseable {
         /**
          * Use {@link AbstractStatus.State} since it has all of the states we 
want,
          * unlike {@link WorkerConnector.State}.
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index a0343b8c95d..78384dfd871 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -28,7 +28,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class TimestampRouter<R extends ConnectRecord<R>> implements 
Transformation<R> {
+public class TimestampRouter<R extends ConnectRecord<R>> implements 
Transformation<R>, AutoCloseable {
 
     private static final Pattern TOPIC = Pattern.compile("${topic}", 
Pattern.LITERAL);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 819732ac6d8..bbda11de150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -123,7 +123,7 @@
  * @see org.apache.kafka.streams.Topology
  */
 @InterfaceStability.Evolving
-public class KafkaStreams {
+public class KafkaStreams implements AutoCloseable {
 
     private static final String JMX_PREFIX = "kafka.streams";
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 09de11d5245..bbfb04941e3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@
 
 import java.util.Map;
 
-public interface RecordCollector {
+public interface RecordCollector extends AutoCloseable {
 
     <K, V> void send(final String topic,
                      final K key,
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index fafa9e6c8c6..f0a991fadae 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -56,7 +56,7 @@
  * If logging is left enabled, log output on stdout can be easily ignored by 
checking
  * whether a given line is valid JSON.
  */
-public class VerifiableProducer {
+public class VerifiableProducer implements AutoCloseable {
 
     private final ObjectMapper mapper = new ObjectMapper();
     private final String topic;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams should implement AutoCloseable where appropriate
> --------------------------------------------------------------
>
>                 Key: KAFKA-7402
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7402
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: Yishun Guan
>            Priority: Minor
>              Labels: needs-kip, newbie
>
> Various components in Streams have close methods but do not implement 
> AutoCloseable. This means that they can't be used in try-with-resources 
> blocks.
> Remedying that would simplify our tests and make life easier for users as 
> well.
> KafkaStreams itself is a notable example of this, but we can take the 
> opportunity to look for other components that make sense as AutoCloseable as 
> well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to