[kafka] branch trunk updated: MINOR: cleanup policy doc update (#6692)

2019-05-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new cc4a7f0  MINOR: cleanup policy doc update (#6692)
cc4a7f0 is described below

commit cc4a7f01e872e6d0664360d5148af09c876ca72b
Author: Joyce Fee 
AuthorDate: Wed May 8 20:38:28 2019 -0500

MINOR: cleanup policy doc update (#6692)

Reviewers: Guozhang Wang 
---
 docs/design.html | 14 +-
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/docs/design.html b/docs/design.html
index ab8c002..8c9b2eb 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -523,16 +523,20 @@
 Configuring The Log Cleaner
 
 The log cleaner is enabled by default. This will start the pool of cleaner 
threads.
-To enable log cleaning on a particular topic you can add the log-specific 
property
-  log.cleanup.policy=compact
-This can be done either at topic creation time or using the alter topic 
command.
-
+To enable log cleaning on a particular topic, add the log-specific property
+ log.cleanup.policy=compact
+
+The log.cleanup.policy property is a broker configuration 
setting defined 
+in the broker's server.properties file; it affects all of the 
topics 
+in the cluster that do not have a configuration override in place as 
documented 
+here.
+
 The log cleaner can be configured to retain a minimum amount of the 
uncompacted "head" of the log. This is enabled by setting the compaction time 
lag.
   log.cleaner.min.compaction.lag.ms
 
 This can be used to prevent messages newer than a minimum message age from 
being subject to compaction. If not set, all log segments are eligible for 
compaction except for the last segment, i.e. the one currently
 being written to. The active segment will not be compacted even if all of 
its messages are older than the minimum compaction time lag.
-
+
 
 Further cleaner configurations are described here.
 



[kafka] branch trunk updated: KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)

2019-05-08 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e6cff21  KAFKA-7320; Add consumer configuration to disable auto topic 
creation [KIP-361] (#5542)
e6cff21 is described below

commit e6cff21fd8c5add0eb7e55417a91f0530a7d3a32
Author: Dhruvil Shah 
AuthorDate: Wed May 8 09:31:05 2019 -0700

KAFKA-7320; Add consumer configuration to disable auto topic creation 
[KIP-361] (#5542)

Implements KIP-361 to provide a consumer configuration to specify whether 
subscribing or assigning a non-existent topic would result in it being 
automatically created or not.

Reviewers: Jason Gustafson 
---
 .../kafka/clients/ManualMetadataUpdater.java   |  10 +-
 .../java/org/apache/kafka/clients/Metadata.java|  45 +++--
 .../org/apache/kafka/clients/MetadataUpdater.java  |   8 +-
 .../org/apache/kafka/clients/NetworkClient.java|  14 ++-
 .../admin/internals/AdminMetadataManager.java  |   3 +-
 .../kafka/clients/consumer/ConsumerConfig.java |  14 ++-
 .../kafka/clients/consumer/KafkaConsumer.java  |   3 +-
 .../consumer/internals/ConsumerMetadata.java   |  10 +-
 .../producer/internals/ProducerMetadata.java   |   7 +-
 .../apache/kafka/clients/NetworkClientTest.java|  40 +++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../internals/AbstractCoordinatorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java |   4 +-
 .../consumer/internals/ConsumerMetadataTest.java   |   4 +-
 .../internals/ConsumerNetworkClientTest.java   |  15 ++-
 .../clients/consumer/internals/FetcherTest.java|   2 +-
 .../internals/OffsetForLeaderEpochClientTest.java  |   2 +-
 .../kafka/api/ConsumerTopicCreationTest.scala  | 107 +
 18 files changed, 228 insertions(+), 66 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index ec007a6..7fb0224 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.slf4j.Logger;
@@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements 
MetadataUpdater {
 }
 
 @Override
-public void handleAuthenticationFailure(AuthenticationException exception) 
{
-// We don't fail the broker on authentication failures, but there is 
sufficient information in the broker logs
-// to identify the failure.
-log.debug("An authentication error occurred in broker-to-broker 
communication.", exception);
+public void handleFatalException(KafkaException exception) {
+// We don't fail the broker on failures, but there should be 
sufficient information in the logs indicating the reason
+// for failure.
+log.debug("An error occurred in broker-to-broker communication.", 
exception);
 }
 
 @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ef01b4b..ae75045 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -66,8 +65,8 @@ public class Metadata implements Closeable {
 private int requestVersion; // bumped on every new topic addition
 private long lastRefreshMs;
 private long lastSuccessfulRefreshMs;
-private AuthenticationException authenticationException;
-private KafkaException metadataException;
+private KafkaException fatalException;
+private KafkaException recoverableException;
 private MetadataCache cache = MetadataCache.empty();
 private boolean needUpdate;
 private final ClusterResourceListeners clusterResourceListeners;
@@ -202,25 +201,13 @@ public class Metadata implements Closeable {
 }
 
 /**
- * If any non-retriable authentication exceptions were encountered during
- * metadata 

[kafka] annotated tag 2.2.1-rc0 updated (85d4f26 -> b922e6c)

2019-05-08 Thread vahid
This is an automated email from the ASF dual-hosted git repository.

vahid pushed a change to annotated tag 2.2.1-rc0
in repository https://gitbox.apache.org/repos/asf/kafka.git.


*** WARNING: tag 2.2.1-rc0 was modified! ***

from 85d4f26  (commit)
  to b922e6c  (tag)
 tagging 85d4f26ebe7de9e796bd8430b88c85d8b9045f2e (commit)
 replaces 2.2.0
  by Vahid Hashemian
  on Tue May 7 21:53:23 2019 -0700

- Log -
2.2.1-rc0
---


No new revisions were added by this update.

Summary of changes:



[kafka] branch trunk updated: BUGFIX: Add missing recording of close of stand-by task (#6663)

2019-05-08 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 83823ae  BUGFIX: Add missing recording of close of stand-by task 
(#6663)
83823ae is described below

commit 83823aedf2e2d176004402152b69bd67f97d8e12
Author: cadonna 
AuthorDate: Wed May 8 17:26:25 2019 +0200

BUGFIX: Add missing recording of close of stand-by task (#6663)

Adds recording of close of a stand-by task to the task-closed metric
Adds unit tests to verify the recording

Reviewers: Guozhang Wang , John Roesler 

---
 .../streams/processor/internals/StandbyTask.java   |   4 +
 .../processor/internals/StandbyTaskTest.java   | 175 ++---
 2 files changed, 159 insertions(+), 20 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 749b2ed..424ded4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
@@ -37,6 +38,7 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
 private Map checkpointedOffsets = new HashMap<>();
+private final Sensor closeTaskSensor;
 
 /**
  * Create {@link StandbyTask} with its assigned partitions
@@ -59,6 +61,7 @@ public class StandbyTask extends AbstractTask {
 final StateDirectory stateDirectory) {
 super(id, partitions, topology, consumer, changelogReader, true, 
stateDirectory, config);
 
+closeTaskSensor = metrics.threadLevelSensor("task-closed", 
Sensor.RecordingLevel.INFO);
 processorContext = new StandbyContextImpl(id, config, stateMgr, 
metrics);
 }
 
@@ -132,6 +135,7 @@ public class StandbyTask extends AbstractTask {
 @Override
 public void close(final boolean clean,
   final boolean isZombie) {
+closeTaskSensor.record();
 if (!taskInitialized) {
 return;
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6e7655a..8c8811d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -45,6 +49,7 @@ import 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -136,7 +141,10 @@ public class StandbyTaskTest {
 }
 
 private final MockConsumer consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
-private final MockRestoreConsumer restoreStateConsumer = 
new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer());
+private final MockRestoreConsumer restoreStateConsumer = 
new MockRestoreConsumer<>(
+new IntegerSerializer(),
+new IntegerSerializer()
+);
 private final StoreChangelogReader changelogReader = new