kafka git commit: KAFKA-2645: Document potentially breaking changes in the release note…

2015-10-27 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk e6f9b9e47 -> fc4ef4791


KAFKA-2645: Document potentially breaking changes in the release note…

…s for 0.9.0

Author: Grant Henke 

Reviewers: Gwen Shapira, Guozhang Wang

Closes #337 from granthenke/docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc4ef479
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc4ef479
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc4ef479

Branch: refs/heads/trunk
Commit: fc4ef479109fbae12470e44f8c12fe9e5f41e179
Parents: e6f9b9e
Author: Grant Henke 
Authored: Tue Oct 27 07:43:19 2015 -0700
Committer: Gwen Shapira 
Committed: Tue Oct 27 07:43:19 2015 -0700

--
 docs/configuration.html |  6 +++---
 docs/documentation.html | 10 +-
 docs/upgrade.html   | 28 +++-
 3 files changed, 31 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/configuration.html
--
diff --git a/docs/configuration.html b/docs/configuration.html
index c3cc13e..41cf995 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
 http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -445,7 +445,7 @@ ZooKeeper also allows you to add a "chroot" path which will 
make all kafka data
 
 
   inter.broker.protocol.version
-  0.8.3
+  0.9.0
   Version of the protocol brokers will use to communicate with each 
other. This will default for the current version of the broker, but may need to 
be set to older versions during a rolling upgrade process. In that scenario, 
upgraded brokers will use the older version of the protocol and therefore will 
be able to communicate with brokers that were not yet upgraded. See upgrade section for more details.
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/documentation.html
--
diff --git a/docs/documentation.html b/docs/documentation.html
index 8f9b081..860f276 100644
--- a/docs/documentation.html
+++ b/docs/documentation.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
 http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +17,10 @@
 
 
 
-Kafka 0.8.2 Documentation
-Prior releases: 0.7.x, 0.8.0, 0.8.1.X.
+Kafka 0.9.0 Documentation
+Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X.
 
-
+
 
 1. Getting Started
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/upgrade.html
--
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4b7033a..69bcdc1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
 http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,13 @@
 
 1.5 Upgrading From Previous Versions
 
-Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.8.3.0
+Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0
 
-0.8.3.0 has an inter-broker protocol change from previous versions. For a 
rolling upgrade:
+0.9.0.0 has an inter-broker protocol change from previous versions. For a 
rolling upgrade:
 
 Update server.properties file on all brokers and add the following 
property: inter.broker.protocol.version=0.8.2.X 
 Upgrade the brokers. This can be done a broker at a time by simply 
bringing it down, updating the code, and restarting it. 
-Once the entire cluster is upgraded, bump the protocol 

kafka git commit: KAFKA-2452: Add new consumer option to mirror maker.

2015-10-27 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e4aed707 -> 2fd645ac2


KAFKA-2452: Add new consumer option to mirror maker.

Author: Jiangjie Qin 

Reviewers: Ben Stopford, Guozhang Wang

Closes #266 from becketqin/KAFKA-2452


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fd645ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fd645ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fd645ac

Branch: refs/heads/trunk
Commit: 2fd645ac2fec7cf089cb8175ee47823b67a07226
Parents: 2e4aed7
Author: Jiangjie Qin 
Authored: Tue Oct 27 07:59:52 2015 -0700
Committer: Gwen Shapira 
Committed: Tue Oct 27 07:59:52 2015 -0700

--
 .../scala/kafka/consumer/BaseConsumer.scala |  12 +-
 .../main/scala/kafka/tools/MirrorMaker.scala| 554 ---
 2 files changed, 373 insertions(+), 193 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/consumer/BaseConsumer.scala
--
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala 
b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 8b93493..52cd5fa 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -28,13 +28,15 @@ trait BaseConsumer {
   def receive(): BaseConsumerRecord
   def stop()
   def cleanup()
+  def commit()
 }
 
 case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, 
key: Array[Byte], value: Array[Byte])
 
 class NewShinyConsumer(topic: String, consumerProps: Properties, val 
timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
-  import scala.collection.JavaConversions._
+
+import scala.collection.JavaConversions._
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
   consumer.subscribe(List(topic))
@@ -58,6 +60,10 @@ class NewShinyConsumer(topic: String, consumerProps: 
Properties, val timeoutMs:
   override def cleanup() {
 this.consumer.close()
   }
+
+  override def commit() {
+this.consumer.commitSync()
+  }
 }
 
 class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends 
BaseConsumer {
@@ -81,5 +87,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: 
Properties) extends B
   override def cleanup() {
 this.consumerConnector.shutdown()
   }
+
+  override def commit() {
+this.consumerConnector.commitOffsets
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/tools/MirrorMaker.scala
--
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index fbe0c83..3cf754b 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,22 +20,27 @@ package kafka.tools
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.regex.Pattern
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, 
ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
+import kafka.client.ClientUtils
+import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, 
Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, 
TopicFilter, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
+import org.apache.kafka.clients.consumer.{ConsumerWakeupException, Consumer, 
ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConversions._
-
+import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
@@ -56,12 +61,11 @@ import scala.collection.JavaConversions._
  */
 object MirrorMaker extends Logging with KafkaMetricsGroup {
 
-  private var connectors: Seq[ZookeeperConsumerConnector] = null
   private var producer: 

kafka git commit: KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools

2015-10-27 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk fc4ef4791 -> 2e4aed707


KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools

Author: Grant Henke 

Reviewers: Gwen Shapira, Ewen Cheslack-Postava

Closes #310 from granthenke/tools-packaging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e4aed70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e4aed70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e4aed70

Branch: refs/heads/trunk
Commit: 2e4aed7070f0283e2c1e0e563fdb3324482463a5
Parents: fc4ef47
Author: Grant Henke 
Authored: Tue Oct 27 07:44:32 2015 -0700
Committer: Gwen Shapira 
Committed: Tue Oct 27 07:44:32 2015 -0700

--
 bin/kafka-verifiable-producer.sh|   2 +-
 checkstyle/import-control.xml   |  14 +-
 .../kafkatest/services/kafka_log4j_appender.py  |   2 +-
 .../performance/producer_performance.py |   2 +-
 .../clients/tools/ProducerPerformance.java  | 201 
 .../clients/tools/ThroughputThrottler.java  | 118 ---
 .../clients/tools/VerifiableLog4jAppender.java  | 162 --
 .../kafka/clients/tools/VerifiableProducer.java | 324 ---
 .../apache/kafka/tools/ProducerPerformance.java | 201 
 .../apache/kafka/tools/ThroughputThrottler.java | 117 +++
 .../kafka/tools/VerifiableLog4jAppender.java| 162 ++
 .../apache/kafka/tools/VerifiableProducer.java  | 324 +++
 12 files changed, 814 insertions(+), 815 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/bin/kafka-verifiable-producer.sh
--
diff --git a/bin/kafka-verifiable-producer.sh b/bin/kafka-verifiable-producer.sh
index d0aa6c5..98fe557 100755
--- a/bin/kafka-verifiable-producer.sh
+++ b/bin/kafka-verifiable-producer.sh
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.clients.tools.VerifiableProducer $@
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.VerifiableProducer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/checkstyle/import-control.xml
--
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e1ea93c..187bee8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -105,14 +105,14 @@
 
   
 
+  
 
-
-  
-  
-  
-  
-  
-
+  
+
+
+
+
+
   
 
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/kafka_log4j_appender.py
--
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py 
b/tests/kafkatest/services/kafka_log4j_appender.py
index 11369aa..ff6bb18 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -38,7 +38,7 @@ class KafkaLog4jAppender(BackgroundThreadService):
 
 @property
 def start_cmd(self):
-cmd = "/opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.clients.tools.VerifiableLog4jAppender" \
+cmd = "/opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.tools.VerifiableLog4jAppender" \
   " --topic %s --broker-list %s" % (self.topic, 
self.kafka.bootstrap_servers())
 if self.max_messages > 0:
 cmd += " --max-messages %s" % str(self.max_messages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/performance/producer_performance.py
--
diff --git a/tests/kafkatest/services/performance/producer_performance.py 
b/tests/kafkatest/services/performance/producer_performance.py
index f842026..25911af 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -46,7 +46,7 @@ class ProducerPerformanceService(JmxMixin, 
PerformanceService):
 def _worker(self, idx, node):
 args = self.args.copy()
 args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 
'jmx_port': self.jmx_port, 'client_id': self.client_id})
-cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.clients.tools.ProducerPerformance " \
+cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.tools.ProducerPerformance " \
   "%(topic)s %(num_records)d %(record_size)d %(throughput)d 
bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % 

kafka git commit: KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL

2015-10-27 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 2fd645ac2 -> d21cb66e7


KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL

Author: Ashish Singh 

Reviewers: Gwen Shapira, Ismael Juma

Closes #175 from SinghAsDev/KAFKA-2447


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d21cb66e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d21cb66e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d21cb66e

Branch: refs/heads/trunk
Commit: d21cb66e7d21ed3d20fc1e13b9a856f764bb4237
Parents: 2fd645a
Author: Ashish Singh 
Authored: Tue Oct 27 08:45:27 2015 -0700
Committer: Gwen Shapira 
Committed: Tue Oct 27 08:45:27 2015 -0700

--
 .../kafka/log4jappender/KafkaLog4jAppender.java | 88 ++--
 1 file changed, 82 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/d21cb66e/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
--
diff --git 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 2baef06..94120e2 100644
--- 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.log4jappender;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
@@ -36,16 +39,28 @@ import java.util.concurrent.Future;
  */
 public class KafkaLog4jAppender extends AppenderSkeleton {
 
-private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-private static final String COMPRESSION_TYPE_CONFIG = "compression.type";
-private static final String ACKS_CONFIG = "acks";
-private static final String RETRIES_CONFIG = "retries";
-private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
-private static final String VALUE_SERIALIZER_CLASS_CONFIG = 
"value.serializer";
+private static final String BOOTSTRAP_SERVERS_CONFIG = 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+private static final String COMPRESSION_TYPE_CONFIG = 
ProducerConfig.COMPRESSION_TYPE_CONFIG;
+private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG;
+private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG;
+private static final String KEY_SERIALIZER_CLASS_CONFIG = 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+private static final String VALUE_SERIALIZER_CLASS_CONFIG = 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+private static final String SECURITY_PROTOCOL = 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+private static final String SSL_TRUSTSTORE_LOCATION = 
SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+private static final String SSL_TRUSTSTORE_PASSWORD = 
SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+private static final String SSL_KEYSTORE_TYPE = 
SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG;
+private static final String SSL_KEYSTORE_LOCATION = 
SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+private static final String SSL_KEYSTORE_PASSWORD = 
SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
 
 private String brokerList = null;
 private String topic = null;
 private String compressionType = null;
+private String securityProtocol = null;
+private String sslTruststoreLocation = null;
+private String sslTruststorePassword = null;
+private String sslKeystoreType = null;
+private String sslKeystoreLocation = null;
+private String sslKeystorePassword = null;
 
 private int retries = 0;
 private int requiredNumAcks = Integer.MAX_VALUE;
@@ -104,6 +119,54 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
 this.syncSend = syncSend;
 }
 
+public String getSslTruststorePassword() {
+return sslTruststorePassword;
+}
+
+public String getSslTruststoreLocation() {
+return sslTruststoreLocation;
+}
+
+public String getSecurityProtocol() {
+return securityProtocol;
+}
+
+public void setSecurityProtocol(String 

kafka git commit: HOTFIX: call consumer.poll() even when no task is assigned

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 38a1b6055 -> af42c3789


HOTFIX: call consumer.poll() even when no task is assigned

StreamThread should keep calling consumer.poll() even when no task is assigned. 
This is necessary to get a task.

guozhangwang

Author: Yasuhiro Matsuda 

Reviewers: Guozhang Wang

Closes #373 from ymatsuda/no_task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af42c378
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af42c378
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af42c378

Branch: refs/heads/trunk
Commit: af42c37899e8fc66590e7f3c4893f8224441f6a8
Parents: 38a1b60
Author: Yasuhiro Matsuda 
Authored: Tue Oct 27 13:57:19 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 13:57:19 2015 -0700

--
 .../processor/internals/StreamThread.java   | 25 +---
 1 file changed, 16 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/af42c378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index abc5c5d..0bf51d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -265,22 +265,29 @@ public class StreamThread extends Thread {
 sensors.pollTimeSensor.record(endPoll - startPoll);
 }
 
-// try to process one record from each task
 totalNumBuffered = 0;
-requiresPoll = false;
 
-for (StreamTask task : tasks.values()) {
-long startProcess = time.milliseconds();
+if (!tasks.isEmpty()) {
+// try to process one record from each task
+requiresPoll = false;
 
-totalNumBuffered += task.process();
-requiresPoll = requiresPoll || task.requiresPoll();
+for (StreamTask task : tasks.values()) {
+long startProcess = time.milliseconds();
 
-sensors.processTimeSensor.record(time.milliseconds() - 
startProcess);
+totalNumBuffered += task.process();
+requiresPoll = requiresPoll || task.requiresPoll();
+
+sensors.processTimeSensor.record(time.milliseconds() - 
startProcess);
+}
+
+maybePunctuate();
+maybeCommit();
+} else {
+// even when no task is assigned, we must poll to get a 
task.
+requiresPoll = true;
 }
 
-maybePunctuate();
 maybeClean();
-maybeCommit();
 }
 } catch (Exception e) {
 throw new KafkaException(e);



[2/2] kafka git commit: KAFKA-1888: rolling upgrade test

2015-10-27 Thread guozhang
KAFKA-1888: rolling upgrade test

ewencp gwenshap
This needs some refactoring to avoid the duplicated code between replication 
test and upgrade test, but in shape for initial feedback.

I'm interested in feedback on the added `KafkaConfig` class and `kafka_props` 
file. This addition makes it:
- easier to attach different configs to different nodes (e.g. during broker 
upgrade process)
- easier to reason about the configuration of a particular node

Notes:
- in the default values in the KafkaConfig class, I removed many properties 
which were in kafka.properties before. This is because most of those properties 
were set to what is already the default value.
- when running non-trunk VerifiableProducer, I append the trunk tools jar to 
the classpath, and run it with the non-trunk kafka-run-class.sh script

Author: Geoff Anderson 

Reviewers: Dong Lin, Ewen Cheslack-Postava

Closes #229 from granders/KAFKA-1888-upgrade-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6b34330
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6b34330
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6b34330

Branch: refs/heads/trunk
Commit: e6b343302f3208f7f6e0099fe2a7132ef9eaaafb
Parents: af42c37
Author: Geoff Anderson 
Authored: Tue Oct 27 15:23:47 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 15:23:47 2015 -0700

--
 tests/kafkatest/__init__.py |  10 +
 .../sanity_checks/test_console_consumer.py  |  43 ++-
 .../sanity_checks/test_kafka_version.py |  55 
 .../sanity_checks/test_verifiable_producer.py   |  70 +
 tests/kafkatest/services/console_consumer.py| 109 ---
 tests/kafkatest/services/copycat.py |   9 +-
 tests/kafkatest/services/kafka.py   | 253 
 tests/kafkatest/services/kafka/__init__.py  |  16 +
 tests/kafkatest/services/kafka/config.py|  53 
 .../kafkatest/services/kafka/config_property.py | 177 +++
 tests/kafkatest/services/kafka/directory.py |  32 ++
 tests/kafkatest/services/kafka/kafka.py | 303 +++
 .../services/kafka/templates/kafka.properties   |  65 
 tests/kafkatest/services/kafka/version.py   |  61 
 .../kafkatest/services/kafka_log4j_appender.py  |  13 +-
 tests/kafkatest/services/mirror_maker.py|   5 +-
 tests/kafkatest/services/monitor/__init__.py|  14 +
 tests/kafkatest/services/monitor/jmx.py |  90 ++
 .../performance/consumer_performance.py |   8 +-
 .../services/performance/end_to_end_latency.py  |   8 +-
 .../kafkatest/services/performance/jmx_mixin.py |  81 -
 .../performance/producer_performance.py |  44 +--
 .../services/templates/kafka.properties |  74 -
 tests/kafkatest/services/verifiable_producer.py |  88 +-
 tests/kafkatest/services/zookeeper.py   |   8 +-
 .../kafkatest/tests/produce_consume_validate.py | 106 +++
 tests/kafkatest/tests/quota_test.py |  20 +-
 tests/kafkatest/tests/replication_test.py   | 207 +
 tests/kafkatest/tests/upgrade_test.py   |  81 +
 tests/kafkatest/utils/__init__.py   |   4 +-
 tests/kafkatest/utils/util.py   |  42 +++
 tests/setup.py  |  10 +-
 .../apache/kafka/tools/VerifiableProducer.java  |  30 +-
 vagrant/base.sh |  26 +-
 34 files changed, 1561 insertions(+), 654 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/__init__.py
--
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 28d269b..e346811 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -14,3 +14,13 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
+# This determines the version of kafkatest that can be published to PyPi and 
installed with pip
+#
+# Note that in development, this version name can't follow Kafka's convention 
of having a trailing "-SNAPSHOT"
+# due to python version naming restrictions, which are enforced by python 
packaging tools
+# (see  https://www.python.org/dev/peps/pep-0440/)
+#
+# Instead, in trunk, the version should have a suffix of the form ".devN"
+#
+# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be 
something like "0.9.0.0.dev0"
+__version__ = '0.9.0.0.dev0'

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_console_consumer.py
--
diff --git 

[1/2] kafka git commit: KAFKA-1888: rolling upgrade test

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk af42c3789 -> e6b343302


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/jmx_mixin.py
--
diff --git a/tests/kafkatest/services/performance/jmx_mixin.py 
b/tests/kafkatest/services/performance/jmx_mixin.py
deleted file mode 100644
index 7e19839..000
--- a/tests/kafkatest/services/performance/jmx_mixin.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-class JmxMixin(object):
-
-def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
-self.jmx_object_names = jmx_object_names
-self.jmx_attributes = jmx_attributes
-self.jmx_port = 9192
-
-self.started = [False] * num_nodes
-self.jmx_stats = [{} for x in range(num_nodes)]
-self.maximum_jmx_value = {}  # map from object_attribute_name to 
maximum value observed over time
-self.average_jmx_value = {}  # map from object_attribute_name to 
average value observed over time
-
-def clean_node(self, node):
-node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
-node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
-
-def start_jmx_tool(self, idx, node):
-if self.started[idx-1] == True or self.jmx_object_names == None:
-return
-self.started[idx-1] = True
-
-cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
-  "--reporting-interval 1000 --jmx-url 
service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
-for jmx_object_name in self.jmx_object_names:
-cmd += " --object-name %s" % jmx_object_name
-for jmx_attribute in self.jmx_attributes:
-cmd += " --attributes %s" % jmx_attribute
-cmd += " | tee -a /mnt/jmx_tool.log"
-
-self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
-jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
-jmx_output.next()
-
-def read_jmx_output(self, idx, node):
-if self.started[idx-1] == False:
-return
-self.maximum_jmx_value = {}
-self.average_jmx_value = {}
-object_attribute_names = []
-
-cmd = "cat /mnt/jmx_tool.log"
-self.logger.debug("Read jmx output %d command: %s", idx, cmd)
-for line in node.account.ssh_capture(cmd, allow_fail=False):
-if "time" in line:
-object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
-continue
-stats = [float(field) for field in line.split(',')]
-time_sec = int(stats[0]/1000)
-self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name 
in enumerate(object_attribute_names)}
-
-# do not calculate average and maximum of jmx stats until we have read 
output from all nodes
-if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
-return
-
-start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in 
self.jmx_stats])
-end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in 
self.jmx_stats])
-
-for name in object_attribute_names:
-aggregates_per_time = []
-for time_sec in xrange(start_time_sec, end_time_sec+1):
-# assume that value is 0 if it is not read by jmx tool at the 
given time. This is appropriate for metrics such as bandwidth
-values_per_node = [time_to_stats.get(time_sec, {}).get(name, 
0) for time_to_stats in self.jmx_stats]
-# assume that value is aggregated across nodes by sum. This is 
appropriate for metrics such as bandwidth
-aggregates_per_time.append(sum(values_per_node))
-self.average_jmx_value[name] = 
sum(aggregates_per_time)/len(aggregates_per_time)
-self.maximum_jmx_value[name] = max(aggregates_per_time)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/producer_performance.py
--
diff 

kafka git commit: KAFKA-2677: ensure consumer sees coordinator disconnects

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk e6b343302 -> 0b05d3b93


KAFKA-2677: ensure consumer sees coordinator disconnects

Author: Jason Gustafson 

Reviewers: Ismael Juma, Guozhang Wang

Closes #349 from hachikuji/KAFKA-2677


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b05d3b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b05d3b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b05d3b9

Branch: refs/heads/trunk
Commit: 0b05d3b939c5ed37a4253e7c3614d824e76ed664
Parents: e6b3433
Author: Jason Gustafson 
Authored: Tue Oct 27 16:24:10 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 16:24:10 2015 -0700

--
 .../org/apache/kafka/clients/NetworkClient.java | 37 ---
 .../consumer/internals/AbstractCoordinator.java | 14 +++-
 .../internals/ConsumerNetworkClient.java| 70 ++--
 .../org/apache/kafka/clients/MockClient.java| 15 +++--
 4 files changed, 83 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4265004..2c56751 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -12,16 +12,6 @@
  */
 package org.apache.kafka.clients;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
 /**
  * A network client for asynchronous request/response network i/o. This is an 
internal class used to implement the
  * user-facing producer and consumer clients.
@@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient {
 /* a list of nodes we've connected to in the past */
 private final List nodesEverSeen;
 private final Map nodesEverSeenById;
+
 /* random offset into nodesEverSeen list */
 private final Random randOffset;
 
@@ -234,16 +235,6 @@ public class NetworkClient implements KafkaClient {
 }
 
 /**
- * Return the state of the connection to the given node
- *
- * @param node The node to check
- * @return The connection state
- */
-public ConnectionState connectionState(String node) {
-return connectionStates.connectionState(node);
-}
-
-/**
  * Queue up the given request for sending. Requests can only be sent out 
to ready nodes.
  *
  * @param request The request
@@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient {
 @Override
 public List poll(long timeout, long now) {
 long metadataTimeout = metadataUpdater.maybeUpdate(now);
-long updatedNow = now;
 try {
 this.selector.poll(Utils.min(timeout, metadataTimeout, 
requestTimeoutMs));
 } catch (IOException e) {
@@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient {
 }
 
 // process completed actions
-updatedNow = this.time.milliseconds();
+long updatedNow = this.time.milliseconds();
 List responses = new ArrayList<>();
 handleCompletedSends(responses, updatedNow);
 handleCompletedReceives(responses, updatedNow);
@@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient {
  * @param nodes Current alive nodes
  */
 private void updateNodesEverSeen(List nodes) {
-Node existing = null;
 for (Node n : nodes) {
-existing = nodesEverSeenById.get(n.id());
+Node existing = nodesEverSeenById.get(n.id());
 if (existing == null) {
 nodesEverSeenById.put(n.id(), n);
 log.debug("Adding node {} to nodes ever seen", n.id());


kafka git commit: HOTFIX: correct sourceNodes for kstream.through()

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 0b05d3b93 -> 13c3e049f


HOTFIX: correct sourceNodes for kstream.through()

guozhangwang

Author: Yasuhiro Matsuda 

Reviewers: Guozhang Wang

Closes #374 from ymatsuda/fix_through_operator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13c3e049
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13c3e049
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13c3e049

Branch: refs/heads/trunk
Commit: 13c3e049fbf22522c90c2a0b4b4f680b974d9bea
Parents: 0b05d3b
Author: Yasuhiro Matsuda 
Authored: Tue Oct 27 16:26:47 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 16:26:47 2015 -0700

--
 .../org/apache/kafka/streams/kstream/internals/KStreamImpl.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/13c3e049/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 404193a..1a2297c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -178,7 +178,7 @@ public class KStreamImpl implements KStream {
 
 topology.addSource(sourceName, keyDeserializer, valDeserializer, 
topic);
 
-return new KStreamImpl<>(topology, sourceName, 
Collections.emptySet());
+return new KStreamImpl<>(topology, sourceName, 
Collections.singleton(sourceName));
 }
 
 @Override



kafka git commit: KAFKA-2683: ensure wakeup exceptions raised to user

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 13c3e049f -> 1ac2640f8


KAFKA-2683: ensure wakeup exceptions raised to user

Author: Jason Gustafson 

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #366 from hachikuji/KAFKA-2683


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ac2640f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ac2640f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ac2640f

Branch: refs/heads/trunk
Commit: 1ac2640f8095262f423c770060b737f81652e211
Parents: 13c3e04
Author: Jason Gustafson 
Authored: Tue Oct 27 17:39:19 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 17:39:19 2015 -0700

--
 .../consumer/ConsumerWakeupException.java   | 20 ---
 .../kafka/clients/consumer/KafkaConsumer.java   | 26 ++--
 .../kafka/clients/consumer/MockConsumer.java|  3 ++-
 .../consumer/internals/AbstractCoordinator.java | 26 +++-
 .../consumer/internals/ConsumerCoordinator.java | 10 +---
 .../internals/ConsumerNetworkClient.java| 10 
 .../kafka/common/errors/WakeupException.java| 26 
 .../internals/ConsumerNetworkClientTest.java|  4 +--
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  3 ++-
 .../runtime/distributed/DistributedHerder.java  |  6 ++---
 .../runtime/distributed/WorkerGroupMember.java  |  2 +-
 .../kafka/copycat/util/KafkaBasedLog.java   | 10 
 12 files changed, 81 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
deleted file mode 100644
index 35f1ec9..000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.KafkaException;
-
-public class ConsumerWakeupException extends KafkaException {
-private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 06a9239..7aef8a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -313,9 +313,9 @@ import java.util.regex.Pattern;
  *
  * 
  * The only exception to this rule is {@link #wakeup()}, which can safely be 
used from an external thread to
- * interrupt an active operation. In this case, a {@link 
ConsumerWakeupException} will be thrown from the thread
- * blocking on the operation. This can be used to shutdown the consumer from 
another thread. The following
- * snippet shows the typical pattern:
+ * interrupt an active operation. In this case, a {@link 
org.apache.kafka.common.errors.WakeupException} will be
+ * thrown from the thread blocking on the operation. This can be used to 
shutdown the consumer from another thread.
+ * The following snippet shows the typical pattern:
  *
  * 
  * public class KafkaConsumerRunner implements Runnable {
@@ -329,7 +329,7 @@ import java.util.regex.Pattern;
  * ConsumerRecords records = consumer.poll(1);
  * // Handle new records
  * }
- * } catch (ConsumerWakeupException e) {
+ * } catch (WakeupException e) {
  *   

kafka git commit: MINOR: Expose ReplicaManager gauges

2015-10-27 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 1ac2640f8 -> 7a36d3647


MINOR: Expose ReplicaManager gauges

There are several gauges in core that are registered but cannot be accessed 
programmatically. For example, gauges "LeaderCount", "PartitionCount", 
"UnderReplicatedParittions" are all registered in ReplicaManager.scala but 
there is no way to access them programmatically if one has access to the 
kafka.server object. Other metrics,  such as isrExpandRate (also in 
ReplicaManager.scala) can be accessed. The solution here is trivial, add a var 
 in front of newGauge, as shown below
val partitionCount newGauge(
 "PartitionCount",
 new Gauge[Int] {
   def value = allPartitions.size
 }
)

Author: Eno Thereska 

Reviewers: Ismael Juma, Guozhang Wang

Closes #364 from enothereska/gauges


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a36d364
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a36d364
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a36d364

Branch: refs/heads/trunk
Commit: 7a36d36478635ae16b64c6410b88c92b45d5f129
Parents: 1ac2640
Author: Eno Thereska 
Authored: Tue Oct 27 17:55:59 2015 -0700
Committer: Guozhang Wang 
Committed: Tue Oct 27 17:55:59 2015 -0700

--
 core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a36d364/core/src/main/scala/kafka/server/ReplicaManager.scala
--
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1fc47f4..0413b1a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -122,7 +122,7 @@ class ReplicaManager(val config: KafkaConfig,
   val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
 purgatoryName = "Fetch", config.brokerId, 
config.fetchPurgatoryPurgeIntervalRequests)
 
-  newGauge(
+  val leaderCount = newGauge(
 "LeaderCount",
 new Gauge[Int] {
   def value = {
@@ -130,13 +130,13 @@ class ReplicaManager(val config: KafkaConfig,
   }
 }
   )
-  newGauge(
+  val partitionCount = newGauge(
 "PartitionCount",
 new Gauge[Int] {
   def value = allPartitions.size
 }
   )
-  newGauge(
+  val underReplicatedPartitions = newGauge(
 "UnderReplicatedPartitions",
 new Gauge[Int] {
   def value = underReplicatedPartitionCount()