kafka git commit: KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()
Repository: kafka Updated Branches: refs/heads/trunk bf292a6fa -> aa56dfb9e KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign() Author: Guozhang WangReviewers: Jason Gustafson, Jun Rao Closes #352 from guozhangwang/K2686 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa56dfb9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa56dfb9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa56dfb9 Branch: refs/heads/trunk Commit: aa56dfb9e7cea19faa545a13d42d499a6958cbef Parents: bf292a6 Author: Guozhang Wang Authored: Thu Oct 22 21:06:10 2015 -0700 Committer: Guozhang Wang Committed: Thu Oct 22 21:06:10 2015 -0700 -- .../kafka/clients/consumer/KafkaConsumer.java | 20 - .../kafka/clients/consumer/MockConsumer.java| 4 +- .../consumer/internals/ConsumerCoordinator.java | 2 +- .../consumer/internals/SubscriptionState.java | 38 + .../clients/consumer/KafkaConsumerTest.java | 32 .../internals/ConsumerCoordinatorTest.java | 18 ++--- .../clients/consumer/internals/FetcherTest.java | 36 - .../internals/SubscriptionStateTest.java| 81 ++-- .../clients/producer/KafkaProducerTest.java | 1 - 9 files changed, 142 insertions(+), 90 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index cd166f0..06a9239 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -629,6 +629,9 @@ public class KafkaConsumer implements Consumer { * assign partitions. Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one). Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. + * + * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. + * * * As part of group management, the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if one of the following events trigger - @@ -653,9 +656,14 @@ public class KafkaConsumer implements Consumer { public void subscribe(List topics, ConsumerRebalanceListener listener) { acquire(); try { -log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); -this.subscriptions.subscribe(topics, listener); -metadata.setTopics(subscriptions.groupSubscription()); +if (topics.isEmpty()) { +// treat subscribing to empty topic list as the same as unsubscribing +this.unsubscribe(); +} else { +log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); +this.subscriptions.subscribe(topics, listener); +metadata.setTopics(subscriptions.groupSubscription()); +} } finally { release(); } @@ -666,6 +674,9 @@ public class KafkaConsumer implements Consumer { * assign partitions. Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one). It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. + * + * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. + * * * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer @@ -715,6 +726,7 @@ public class KafkaConsumer implements Consumer { public void unsubscribe() { acquire(); try { +log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); this.coordinator.resetGeneration(); this.metadata.needMetadataForAllTopics(false); @@ -739,7 +751,7 @@ public class KafkaConsumer implements Consumer { acquire(); try { log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ",
kafka git commit: KAFKA-2671: Enable starting Kafka server with a Properties object
Repository: kafka Updated Branches: refs/heads/trunk aa56dfb9e -> 701c46b3a KAFKA-2671: Enable starting Kafka server with a Properties object Author: Ashish SinghReviewers: Eno Thereska, Gwen Shapira Closes #330 from SinghAsDev/KAFKA-2671 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/701c46b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/701c46b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/701c46b3 Branch: refs/heads/trunk Commit: 701c46b3a32da78008ae48298778fca8b5a16bce Parents: aa56dfb Author: Ashish Singh Authored: Thu Oct 22 22:27:46 2015 -0700 Committer: Gwen Shapira Committed: Thu Oct 22 22:27:46 2015 -0700 -- core/src/main/scala/kafka/Kafka.scala | 12 +--- .../main/scala/kafka/server/KafkaServerStartable.scala | 11 ++- 2 files changed, 15 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/701c46b3/core/src/main/scala/kafka/Kafka.scala -- diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 6af7b80..6b551ce 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -19,13 +19,13 @@ package kafka import java.util.Properties -import scala.collection.JavaConversions._ import joptsimple.OptionParser -import metrics.KafkaMetricsReporter -import server.{KafkaConfig, KafkaServerStartable, KafkaServer} -import kafka.utils.{VerifiableProperties, CommandLineUtils, Logging} +import kafka.server.{KafkaServer, KafkaServerStartable} +import kafka.utils.{CommandLineUtils, Logging} import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConversions._ + object Kafka extends Logging { def getPropsFromArgs(args: Array[String]): Properties = { @@ -55,9 +55,7 @@ object Kafka extends Logging { def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) - val serverConfig = KafkaConfig.fromProps(serverProps) - KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) - val kafkaServerStartable = new KafkaServerStartable(serverConfig) + val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread() { http://git-wip-us.apache.org/repos/asf/kafka/blob/701c46b3/core/src/main/scala/kafka/server/KafkaServerStartable.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index df521b3..fc98912 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -17,8 +17,17 @@ package kafka.server -import kafka.utils.Logging +import java.util.Properties +import kafka.metrics.KafkaMetricsReporter +import kafka.utils.{VerifiableProperties, Logging} + +object KafkaServerStartable { + def fromProps(serverProps: Properties) = { +KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) +new KafkaServerStartable(KafkaConfig.fromProps(serverProps)) + } +} class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { private val server = new KafkaServer(serverConfig)
kafka git commit: KAFKA-2678; partition level lag metrics can be negative
Repository: kafka Updated Branches: refs/heads/trunk 65922b538 -> 2e25f899a KAFKA-2678; partition level lag metrics can be negative Author: Dong LinAuthor: Dong Lin Reviewers: Guozhang Wang Closes #346 from lindong28/KAFKA-2678 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e25f899 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e25f899 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e25f899 Branch: refs/heads/trunk Commit: 2e25f899a118e7d4d5eb89118e447a87ad02f71c Parents: 65922b5 Author: Dong Lin Authored: Thu Oct 22 09:26:05 2015 -0700 Committer: Guozhang Wang Committed: Thu Oct 22 09:26:05 2015 -0700 -- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2e25f899/core/src/main/scala/kafka/server/AbstractFetcherThread.scala -- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 21c7e3e..eba2d5a 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -132,7 +132,7 @@ abstract class AbstractFetcherThread(name: String, case None => currentPartitionFetchState.offset } partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) -fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.highWatermark - newOffset +fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) fetcherStats.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
kafka git commit: MINOR: Restore `SslConsumerTest` which was accidentally deleted in client-side assignment commit
Repository: kafka Updated Branches: refs/heads/trunk 2e25f899a -> bf292a6fa MINOR: Restore `SslConsumerTest` which was accidentally deleted in client-side assignment commit Probably happened while resolving conflicts, commit: 86eb74d9236c586af5889fe79f4b9e066c9c2af3 Author: Ismael JumaReviewers: Jason Gustafson Closes #350 from ijuma/restore-ssl-consumer-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf292a6f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf292a6f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf292a6f Branch: refs/heads/trunk Commit: bf292a6fa5b52844edfe256e8f1dd42da432d0c9 Parents: 2e25f89 Author: Ismael Juma Authored: Thu Oct 22 09:52:09 2015 -0700 Committer: Guozhang Wang Committed: Thu Oct 22 09:52:09 2015 -0700 -- .../integration/kafka/api/SslConsumerTest.scala | 22 1 file changed, 22 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/bf292a6f/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala -- diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala new file mode 100644 index 000..1d13d88 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.File + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SslConsumerTest extends BaseConsumerTest { + override protected def securityProtocol = SecurityProtocol.SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +}