This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9646602 KAFKA-7402: Implement KIP-376 AutoCloseable additions 9646602 is described below commit 9646602d6832ad0a5f2e9b65af5df1a80a571691 Author: Yishun Guan <gyis...@gmail.com> AuthorDate: Fri Nov 16 15:58:47 2018 -0800 KAFKA-7402: Implement KIP-376 AutoCloseable additions --- .../java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java | 2 +- .../apache/kafka/clients/consumer/internals/AbstractCoordinator.java | 2 +- .../src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java | 2 +- clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java | 2 +- clients/src/main/java/org/apache/kafka/common/network/Selector.java | 2 +- .../main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +- .../internals/expiring/ExpiringCredentialRefreshingLogin.java | 2 +- .../src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java | 2 +- .../src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java | 2 +- .../main/java/org/apache/kafka/connect/transforms/TimestampRouter.java | 2 +- streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- .../org/apache/kafka/streams/processor/internals/RecordCollector.java | 2 +- tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java | 2 +- 13 files changed, 13 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java index 763fe51..6af4705 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java @@ -40,7 +40,7 @@ import java.util.Map; * <p> * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface ConsumerInterceptor<K, V> extends Configurable { +public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable { /** * This is called just before the records are returned by diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 335e0f2..fb710f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -933,7 +933,7 @@ public abstract class AbstractCoordinator implements Closeable { } } - private class HeartbeatThread extends KafkaThread { + private class HeartbeatThread extends KafkaThread implements AutoCloseable { private boolean enabled = false; private boolean closed = false; private AtomicReference<RuntimeException> failed = new AtomicReference<>(null); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 995bdaa..55d6b25 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.Configurable; * <p> * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Configurable { +public interface MetricsReporter extends Configurable, AutoCloseable { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 47b1375..3bca276 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -63,7 +63,7 @@ import java.util.function.Supplier; * to memory pressure or other reasons</li> * </ul> */ -public class KafkaChannel { +public class KafkaChannel implements AutoCloseable { private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000; /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index b960fcb..843d46d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -983,7 +983,7 @@ public class Selector implements Selectable, AutoCloseable { return deque == null ? 0 : deque.size(); } - private class SelectorMetrics { + private class SelectorMetrics implements AutoCloseable { private final Metrics metrics; private final String metricGrpPrefix; private final Map<String, String> metricTags; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 0cc2cec..7512c82 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -37,7 +37,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; * and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens. * This will release resources like compression buffers that can be relatively large (64 KB for LZ4). */ -public class MemoryRecordsBuilder { +public class MemoryRecordsBuilder implements AutoCloseable { private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java index 546b158..f6bb264 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; * server when the login is a type that has a limited lifetime/will expire. The * credentials for the login must implement {@link ExpiringCredential}. */ -public abstract class ExpiringCredentialRefreshingLogin { +public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable { /** * Class that can be overridden for testing */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index d1e97b2..cf82f86 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -232,7 +232,7 @@ public class ConnectMetrics { * the {@link Metrics} class, so that the sensor names are made to be unique (based on the group name) * and so the sensors are removed when this group is {@link #close() closed}. */ - public class MetricGroup { + public class MetricGroup implements AutoCloseable { private final MetricGroupId groupId; private final Set<String> sensorNames = new HashSet<>(); private final String sensorPrefix; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index b7fe74f..e5b990f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -232,7 +232,7 @@ public class WorkerConnector { '}'; } - class ConnectorMetricsGroup implements ConnectorStatus.Listener { + class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable { /** * Use {@link AbstractStatus.State} since it has all of the states we want, * unlike {@link WorkerConnector.State}. diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java index a0343b8..78384df 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java @@ -28,7 +28,7 @@ import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R> { +public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable { private static final Pattern TOPIC = Pattern.compile("${topic}", Pattern.LITERAL); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 819732a..bbda11d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -123,7 +123,7 @@ import static org.apache.kafka.common.utils.Utils.getPort; * @see org.apache.kafka.streams.Topology */ @InterfaceStability.Evolving -public class KafkaStreams { +public class KafkaStreams implements AutoCloseable { private static final String JMX_PREFIX = "kafka.streams"; private static final int DEFAULT_CLOSE_TIMEOUT = 0; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 09de11d..bbfb049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import java.util.Map; -public interface RecordCollector { +public interface RecordCollector extends AutoCloseable { <K, V> void send(final String topic, final K key, diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index fafa9e6..f0a991f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -56,7 +56,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; * If logging is left enabled, log output on stdout can be easily ignored by checking * whether a given line is valid JSON. */ -public class VerifiableProducer { +public class VerifiableProducer implements AutoCloseable { private final ObjectMapper mapper = new ObjectMapper(); private final String topic;