[kafka] branch trunk updated: MINOR: cleanup policy doc update (#6692)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cc4a7f0 MINOR: cleanup policy doc update (#6692) cc4a7f0 is described below commit cc4a7f01e872e6d0664360d5148af09c876ca72b Author: Joyce Fee AuthorDate: Wed May 8 20:38:28 2019 -0500 MINOR: cleanup policy doc update (#6692) Reviewers: Guozhang Wang --- docs/design.html | 14 +- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/design.html b/docs/design.html index ab8c002..8c9b2eb 100644 --- a/docs/design.html +++ b/docs/design.html @@ -523,16 +523,20 @@ Configuring The Log Cleaner The log cleaner is enabled by default. This will start the pool of cleaner threads. -To enable log cleaning on a particular topic you can add the log-specific property - log.cleanup.policy=compact -This can be done either at topic creation time or using the alter topic command. - +To enable log cleaning on a particular topic, add the log-specific property + log.cleanup.policy=compact + +The log.cleanup.policy property is a broker configuration setting defined +in the broker's server.properties file; it affects all of the topics +in the cluster that do not have a configuration override in place as documented +here. + The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag. log.cleaner.min.compaction.lag.ms This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag. - + Further cleaner configurations are described here.
[kafka] branch trunk updated: KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e6cff21 KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542) e6cff21 is described below commit e6cff21fd8c5add0eb7e55417a91f0530a7d3a32 Author: Dhruvil Shah AuthorDate: Wed May 8 09:31:05 2019 -0700 KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542) Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not. Reviewers: Jason Gustafson --- .../kafka/clients/ManualMetadataUpdater.java | 10 +- .../java/org/apache/kafka/clients/Metadata.java| 45 +++-- .../org/apache/kafka/clients/MetadataUpdater.java | 8 +- .../org/apache/kafka/clients/NetworkClient.java| 14 ++- .../admin/internals/AdminMetadataManager.java | 3 +- .../kafka/clients/consumer/ConsumerConfig.java | 14 ++- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerMetadata.java | 10 +- .../producer/internals/ProducerMetadata.java | 7 +- .../apache/kafka/clients/NetworkClientTest.java| 40 +++- .../kafka/clients/consumer/KafkaConsumerTest.java | 4 +- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../consumer/internals/ConsumerMetadataTest.java | 4 +- .../internals/ConsumerNetworkClientTest.java | 15 ++- .../clients/consumer/internals/FetcherTest.java| 2 +- .../internals/OffsetForLeaderEpochClientTest.java | 2 +- .../kafka/api/ConsumerTopicCreationTest.scala | 107 + 18 files changed, 228 insertions(+), 66 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index ec007a6..7fb0224 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.slf4j.Logger; @@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements MetadataUpdater { } @Override -public void handleAuthenticationFailure(AuthenticationException exception) { -// We don't fail the broker on authentication failures, but there is sufficient information in the broker logs -// to identify the failure. -log.debug("An authentication error occurred in broker-to-broker communication.", exception); +public void handleFatalException(KafkaException exception) { +// We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason +// for failure. +log.debug("An error occurred in broker-to-broker communication.", exception); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index ef01b4b..ae75045 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -66,8 +65,8 @@ public class Metadata implements Closeable { private int requestVersion; // bumped on every new topic addition private long lastRefreshMs; private long lastSuccessfulRefreshMs; -private AuthenticationException authenticationException; -private KafkaException metadataException; +private KafkaException fatalException; +private KafkaException recoverableException; private MetadataCache cache = MetadataCache.empty(); private boolean needUpdate; private final ClusterResourceListeners clusterResourceListeners; @@ -202,25 +201,13 @@ public class Metadata implements Closeable { } /** - * If any non-retriable authentication exceptions were encountered during - * metadata
[kafka] annotated tag 2.2.1-rc0 updated (85d4f26 -> b922e6c)
This is an automated email from the ASF dual-hosted git repository. vahid pushed a change to annotated tag 2.2.1-rc0 in repository https://gitbox.apache.org/repos/asf/kafka.git. *** WARNING: tag 2.2.1-rc0 was modified! *** from 85d4f26 (commit) to b922e6c (tag) tagging 85d4f26ebe7de9e796bd8430b88c85d8b9045f2e (commit) replaces 2.2.0 by Vahid Hashemian on Tue May 7 21:53:23 2019 -0700 - Log - 2.2.1-rc0 --- No new revisions were added by this update. Summary of changes:
[kafka] branch trunk updated: BUGFIX: Add missing recording of close of stand-by task (#6663)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 83823ae BUGFIX: Add missing recording of close of stand-by task (#6663) 83823ae is described below commit 83823aedf2e2d176004402152b69bd67f97d8e12 Author: cadonna AuthorDate: Wed May 8 17:26:25 2019 +0200 BUGFIX: Add missing recording of close of stand-by task (#6663) Adds recording of close of a stand-by task to the task-closed metric Adds unit tests to verify the recording Reviewers: Guozhang Wang , John Roesler --- .../streams/processor/internals/StandbyTask.java | 4 + .../processor/internals/StandbyTaskTest.java | 175 ++--- 2 files changed, 159 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 749b2ed..424ded4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; @@ -37,6 +38,7 @@ import java.util.Map; public class StandbyTask extends AbstractTask { private Map checkpointedOffsets = new HashMap<>(); +private final Sensor closeTaskSensor; /** * Create {@link StandbyTask} with its assigned partitions @@ -59,6 +61,7 @@ public class StandbyTask extends AbstractTask { final StateDirectory stateDirectory) { super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config); +closeTaskSensor = metrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); } @@ -132,6 +135,7 @@ public class StandbyTask extends AbstractTask { @Override public void close(final boolean clean, final boolean isZombie) { +closeTaskSensor.record(); if (!taskInitialized) { return; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 6e7655a..8c8811d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -45,6 +49,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -136,7 +141,10 @@ public class StandbyTaskTest { } private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); -private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer()); +private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer<>( +new IntegerSerializer(), +new IntegerSerializer() +); private final StoreChangelogReader changelogReader = new