kafka git commit: KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()

2015-10-22 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk bf292a6fa -> aa56dfb9e


KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()

Author: Guozhang Wang 

Reviewers: Jason Gustafson, Jun Rao

Closes #352 from guozhangwang/K2686


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa56dfb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa56dfb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa56dfb9

Branch: refs/heads/trunk
Commit: aa56dfb9e7cea19faa545a13d42d499a6958cbef
Parents: bf292a6
Author: Guozhang Wang 
Authored: Thu Oct 22 21:06:10 2015 -0700
Committer: Guozhang Wang 
Committed: Thu Oct 22 21:06:10 2015 -0700

--
 .../kafka/clients/consumer/KafkaConsumer.java   | 20 -
 .../kafka/clients/consumer/MockConsumer.java|  4 +-
 .../consumer/internals/ConsumerCoordinator.java |  2 +-
 .../consumer/internals/SubscriptionState.java   | 38 +
 .../clients/consumer/KafkaConsumerTest.java | 32 
 .../internals/ConsumerCoordinatorTest.java  | 18 ++---
 .../clients/consumer/internals/FetcherTest.java | 36 -
 .../internals/SubscriptionStateTest.java| 81 ++--
 .../clients/producer/KafkaProducerTest.java |  1 -
 9 files changed, 142 insertions(+), 90 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cd166f0..06a9239 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -629,6 +629,9 @@ public class KafkaConsumer implements Consumer {
  * assign partitions. Topic subscriptions are not incremental. This list 
will replace the current
  * assignment (if there is one). Note that it is not possible to combine 
topic subscription with group management
  * with manual partition assignment through {@link #assign(List)}.
+ *
+ * If the given list of topics is empty, it is treated the same as {@link 
#unsubscribe()}.
+ *
  * 
  * As part of group management, the consumer will keep track of the list 
of consumers that belong to a particular
  * group and will trigger a rebalance operation if one of the following 
events trigger -
@@ -653,9 +656,14 @@ public class KafkaConsumer implements Consumer 
{
 public void subscribe(List topics, ConsumerRebalanceListener 
listener) {
 acquire();
 try {
-log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-this.subscriptions.subscribe(topics, listener);
-metadata.setTopics(subscriptions.groupSubscription());
+if (topics.isEmpty()) {
+// treat subscribing to empty topic list as the same as 
unsubscribing
+this.unsubscribe();
+} else {
+log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", 
"));
+this.subscriptions.subscribe(topics, listener);
+metadata.setTopics(subscriptions.groupSubscription());
+}
 } finally {
 release();
 }
@@ -666,6 +674,9 @@ public class KafkaConsumer implements Consumer {
  * assign partitions. Topic subscriptions are not incremental. This list 
will replace the current
  * assignment (if there is one). It is not possible to combine topic 
subscription with group management
  * with manual partition assignment through {@link #assign(List)}.
+ *
+ * If the given list of topics is empty, it is treated the same as {@link 
#unsubscribe()}.
+ *
  * 
  * This is a short-hand for {@link #subscribe(List, 
ConsumerRebalanceListener)}, which
  * uses a noop listener. If you need the ability to either seek to 
particular offsets, you should prefer
@@ -715,6 +726,7 @@ public class KafkaConsumer implements Consumer {
 public void unsubscribe() {
 acquire();
 try {
+log.debug("Unsubscribed all topics or patterns and assigned 
partitions");
 this.subscriptions.unsubscribe();
 this.coordinator.resetGeneration();
 this.metadata.needMetadataForAllTopics(false);
@@ -739,7 +751,7 @@ public class KafkaConsumer implements Consumer {
 acquire();
 try {
 log.debug("Subscribed to partition(s): {}", Utils.join(partitions, 
", 

kafka git commit: KAFKA-2671: Enable starting Kafka server with a Properties object

2015-10-22 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk aa56dfb9e -> 701c46b3a


KAFKA-2671: Enable starting Kafka server with a Properties object

Author: Ashish Singh 

Reviewers: Eno Thereska, Gwen Shapira

Closes #330 from SinghAsDev/KAFKA-2671


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/701c46b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/701c46b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/701c46b3

Branch: refs/heads/trunk
Commit: 701c46b3a32da78008ae48298778fca8b5a16bce
Parents: aa56dfb
Author: Ashish Singh 
Authored: Thu Oct 22 22:27:46 2015 -0700
Committer: Gwen Shapira 
Committed: Thu Oct 22 22:27:46 2015 -0700

--
 core/src/main/scala/kafka/Kafka.scala   | 12 +---
 .../main/scala/kafka/server/KafkaServerStartable.scala  | 11 ++-
 2 files changed, 15 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/701c46b3/core/src/main/scala/kafka/Kafka.scala
--
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index 6af7b80..6b551ce 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -19,13 +19,13 @@ package kafka
 
 import java.util.Properties
 
-import scala.collection.JavaConversions._
 import joptsimple.OptionParser
-import metrics.KafkaMetricsReporter
-import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import kafka.utils.{VerifiableProperties, CommandLineUtils, Logging}
+import kafka.server.{KafkaServer, KafkaServerStartable}
+import kafka.utils.{CommandLineUtils, Logging}
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConversions._
+
 object Kafka extends Logging {
 
   def getPropsFromArgs(args: Array[String]): Properties = {
@@ -55,9 +55,7 @@ object Kafka extends Logging {
   def main(args: Array[String]): Unit = {
 try {
   val serverProps = getPropsFromArgs(args)
-  val serverConfig = KafkaConfig.fromProps(serverProps)
-  KafkaMetricsReporter.startReporters(new 
VerifiableProperties(serverProps))
-  val kafkaServerStartable = new KafkaServerStartable(serverConfig)
+  val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
 
   // attach shutdown handler to catch control-c
   Runtime.getRuntime().addShutdownHook(new Thread() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/701c46b3/core/src/main/scala/kafka/server/KafkaServerStartable.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala 
b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index df521b3..fc98912 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -17,8 +17,17 @@
 
 package kafka.server
 
-import kafka.utils.Logging
+import java.util.Properties
 
+import kafka.metrics.KafkaMetricsReporter
+import kafka.utils.{VerifiableProperties, Logging}
+
+object KafkaServerStartable {
+  def fromProps(serverProps: Properties) = {
+KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
+new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
+  }
+}
 
 class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   private val server = new KafkaServer(serverConfig)



kafka git commit: KAFKA-2678; partition level lag metrics can be negative

2015-10-22 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 65922b538 -> 2e25f899a


KAFKA-2678; partition level lag metrics can be negative

Author: Dong Lin 
Author: Dong Lin 

Reviewers: Guozhang Wang

Closes #346 from lindong28/KAFKA-2678


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e25f899
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e25f899
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e25f899

Branch: refs/heads/trunk
Commit: 2e25f899a118e7d4d5eb89118e447a87ad02f71c
Parents: 65922b5
Author: Dong Lin 
Authored: Thu Oct 22 09:26:05 2015 -0700
Committer: Guozhang Wang 
Committed: Thu Oct 22 09:26:05 2015 -0700

--
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e25f899/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
--
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 21c7e3e..eba2d5a 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -132,7 +132,7 @@ abstract class AbstractFetcherThread(name: String,
   case None => currentPartitionFetchState.offset
 }
 partitionMap.put(topicAndPartition, new 
PartitionFetchState(newOffset))
-fetcherLagStats.getFetcherLagStats(topic, partitionId).lag 
= partitionData.highWatermark - newOffset
+fetcherLagStats.getFetcherLagStats(topic, partitionId).lag 
= Math.max(0L, partitionData.highWatermark - newOffset)
 fetcherStats.byteRate.mark(validBytes)
 // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
 processPartitionData(topicAndPartition, 
currentPartitionFetchState.offset, partitionData)



kafka git commit: MINOR: Restore `SslConsumerTest` which was accidentally deleted in client-side assignment commit

2015-10-22 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e25f899a -> bf292a6fa


MINOR: Restore `SslConsumerTest` which was accidentally deleted in client-side 
assignment commit

Probably happened while resolving conflicts, commit: 
86eb74d9236c586af5889fe79f4b9e066c9c2af3

Author: Ismael Juma 

Reviewers: Jason Gustafson

Closes #350 from ijuma/restore-ssl-consumer-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf292a6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf292a6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf292a6f

Branch: refs/heads/trunk
Commit: bf292a6fa5b52844edfe256e8f1dd42da432d0c9
Parents: 2e25f89
Author: Ismael Juma 
Authored: Thu Oct 22 09:52:09 2015 -0700
Committer: Guozhang Wang 
Committed: Thu Oct 22 09:52:09 2015 -0700

--
 .../integration/kafka/api/SslConsumerTest.scala | 22 
 1 file changed, 22 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf292a6f/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
--
diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
new file mode 100644
index 000..1d13d88
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
@@ -0,0 +1,22 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SslConsumerTest extends BaseConsumerTest {
+  override protected def securityProtocol = SecurityProtocol.SSL
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+}