kafka git commit: kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao

2015-08-04 Thread junrao
Repository: kafka
Updated Branches:
  refs/heads/trunk 1a0179f21 - a56a79055


kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; 
patched by Aditya Auradkar; reviewed Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a56a7905
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a56a7905
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a56a7905

Branch: refs/heads/trunk
Commit: a56a79055dfba4687f476b0a4d20aeec1c4ebff7
Parents: 1a0179f
Author: Aditya Auradkar aaurad...@linkedin.com
Authored: Tue Aug 4 15:11:27 2015 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Tue Aug 4 15:11:27 2015 -0700

--
 bin/kafka-configs.sh|  17 ++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  77 +---
 .../main/scala/kafka/admin/ConfigCommand.scala  | 174 ++
 .../main/scala/kafka/admin/TopicCommand.scala   |  39 +---
 .../main/scala/kafka/cluster/Partition.scala|   5 +-
 .../kafka/controller/KafkaController.scala  |   7 +-
 .../controller/PartitionLeaderSelector.scala|   6 +-
 .../kafka/controller/TopicDeletionManager.scala |   5 +-
 .../main/scala/kafka/server/ConfigHandler.scala |  69 +++
 .../kafka/server/DynamicConfigManager.scala | 183 +++
 .../main/scala/kafka/server/KafkaServer.scala   |  15 +-
 .../kafka/server/ReplicaFetcherThread.scala |   4 +-
 .../scala/kafka/server/TopicConfigManager.scala | 152 ---
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  33 +++-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   8 +-
 .../unit/kafka/admin/ConfigCommandTest.scala|  73 
 .../unit/kafka/admin/TopicCommandTest.scala |  13 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  83 -
 18 files changed, 718 insertions(+), 245 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/bin/kafka-configs.sh
--
diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh
new file mode 100755
index 000..417eaf5
--- /dev/null
+++ b/bin/kafka-configs.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/AdminUtils.scala
--
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 4cc2376..990 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -21,6 +21,7 @@ import kafka.common._
 import kafka.cluster.{BrokerEndPoint, Broker}
 
 import kafka.log.LogConfig
+import kafka.server.ConfigType
 import kafka.utils._
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
@@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
 object AdminUtils extends Logging {
   val rand = new Random
-
   val AdminClientId = __admin_client
-
-  val TopicConfigChangeZnodePrefix = config_change_
+  val EntityConfigChangeZnodePrefix = config_change_
 
   /**
* There are 2 goals of replica assignment:
@@ -103,14 +102,12 @@ object AdminUtils extends Logging {
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignmentStr Manual replica assignment
   * @param checkBrokerAvailable Ignore checking if assigned replica broker is 
available. Only used for testing
-  * @param config Pre-existing properties that should be preserved
   */
   def addPartitions(zkClient: ZkClient,
 topic: String,
 numPartitions: Int = 1,
 replicaAssignmentStr: String = ,
-checkBrokerAvailable: Boolean = true,
-config: Properties = new Properties) {
+checkBrokerAvailable: Boolean = true) {
 val existingPartitionsReplicaList = 

kafka git commit: KAFKA-2405; Don't kill the JVM on session establishment failure

2015-08-04 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk a56a79055 - 7a666f7aa


KAFKA-2405; Don't kill the JVM on session establishment failure

As noted in the JIRA https://issues.apache.org/jira/browse/KAFKA-2405 currently 
the KafkaHealthCheck causes the JVM to terminate in cases where session 
establishment with Zookeeper fails. I don't know if retrying (after a while) is 
a better way to fix this but at least, IMO, the session establishment failure 
shouldn't kill the JVM. This commit removes the `System.exit()` call.

Author: Jaikiran Pai jaikiran@gmail.com

Reviewers: Gwen Shapira csh...@gmail.com

Closes #111 from jaikiran/kafka-2405 and squashes the following commits:

0255fdb [Jaikiran Pai] KAFKA-2405 Don't kill the JVM on session establishment 
failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a666f7a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a666f7a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a666f7a

Branch: refs/heads/trunk
Commit: 7a666f7aa8b1be927579817187e0b5b93543b5e2
Parents: a56a790
Author: Jaikiran Pai jaikiran@gmail.com
Authored: Tue Aug 4 17:10:02 2015 -0700
Committer: Chen Shapira g...@chens-mbp.gateway.sonic.net
Committed: Tue Aug 4 17:10:02 2015 -0700

--
 core/src/main/scala/kafka/controller/KafkaController.scala | 2 +-
 core/src/main/scala/kafka/server/KafkaHealthcheck.scala| 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a666f7a/core/src/main/scala/kafka/controller/KafkaController.scala
--
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6844602..b19e57f 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1138,7 +1138,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient, val brokerSt
 }
 
 override def handleSessionEstablishmentError(error: Throwable): Unit = {
-  //no-op handleSessionEstablishmentError in KafkaHealthCheck should 
System.exit and log the error.
+  //no-op handleSessionEstablishmentError in KafkaHealthCheck should 
handle this error in its handleSessionEstablishmentError
 }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a666f7a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index ea0c996..e6e270b 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -92,7 +92,6 @@ class KafkaHealthcheck(private val brokerId: Int,
 
 override def handleSessionEstablishmentError(error: Throwable): Unit = {
   fatal(Could not establish session with zookeeper, error)
-  System.exit(-1)
 }
   }
 



kafka git commit: KAFKA-2055; Fix transient ConsumerBounceTest.testSeekAndCommitWithBrokerFailure failure.

2015-08-04 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 3c0963084 - 4b400afce


KAFKA-2055; Fix transient ConsumerBounceTest.testSeekAndCommitWithBrokerFailure 
failure.

…kerFailures failure;

Author: lvfangmin lvfang...@gmail.com

Reviewers: Guozhang

Closes #98 from lvfangmin/KAFKA-2055 and squashes the following commits:

057a1f1 [lvfangmin] KAFKA-2055; Fix transient 
ConsumerBounceTest.testSeekAndCommitWithBrokerFailures failure;


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b400afc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b400afc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b400afc

Branch: refs/heads/trunk
Commit: 4b400afceadcbe74e3ecbfeb7c3066fd436da192
Parents: 3c09630
Author: lvfangmin lvfang...@gmail.com
Authored: Tue Aug 4 11:29:10 2015 -0700
Committer: Guozhang Wang wangg...@gmail.com
Committed: Tue Aug 4 11:29:10 2015 -0700

--
 .../test/scala/integration/kafka/api/ConsumerBounceTest.scala   | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b400afc/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
--
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index d8eee52..93f9468 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,6 +104,11 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
 consumer.subscribe(tp)
 consumer.seek(tp, 0)
 
+// wait until all the followers have synced the last HW with leader
+TestUtils.waitUntilTrue(() = servers.forall(server =
+  server.replicaManager.getReplica(tp.topic(), 
tp.partition()).get.highWatermark.messageOffset == numRecords
+), Failed to update high watermark for followers after timeout)
+
 val scheduler = new BounceBrokerScheduler(numIters)
 scheduler.start()
 



kafka git commit: HOTFIX; Encode/decode to utf-8 for commit title IO in kafka-merge-pr.py after KAFKA-2384

2015-08-04 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 4b400afce - 1a0179f21


HOTFIX; Encode/decode to utf-8 for commit title IO in kafka-merge-pr.py after 
KAFKA-2384

This fix should be fine for Linux and OS X. Not sure about Windows though. This 
is a very specific fix for new functionality added in KAFKA-2384. There are 
other places where a similar error could occur, but are less likely.

The script doesn't really support Unicode input at the moment.

Author: Ismael Juma ism...@juma.me.uk

Reviewers: Guozhang

Closes #109 from ijuma/kafka-2384-hotfix and squashes the following commits:

0ab8958 [Ismael Juma] Encode/decode to utf-8 for commit title IO in 
kafka-merge-pr.py


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a0179f2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a0179f2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a0179f2

Branch: refs/heads/trunk
Commit: 1a0179f21a3937065fea2b745de67967bcafcb68
Parents: 4b400af
Author: Ismael Juma ism...@juma.me.uk
Authored: Tue Aug 4 11:33:03 2015 -0700
Committer: Guozhang Wang wangg...@gmail.com
Committed: Tue Aug 4 11:33:03 2015 -0700

--
 kafka-merge-pr.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0179f2/kafka-merge-pr.py
--
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index 576d315..5e8cbf5 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -381,7 +381,7 @@ def main():
 url = pr[url]
 
 pr_title = pr[title]
-commit_title = raw_input(Commit title [%s]:  % pr_title)
+commit_title = raw_input(Commit title [%s]:  % 
pr_title.encode(utf-8)).decode(utf-8)
 if commit_title == :
 commit_title = pr_title
 



kafka git commit: KAFKA-2288; Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by Jun Rao

2015-08-04 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 7a666f7aa - 9cefb2a0f


KAFKA-2288; Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by 
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cefb2a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cefb2a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cefb2a0

Branch: refs/heads/trunk
Commit: 9cefb2a0fb7852d35cfe0f051bc6eadb8e9c4c80
Parents: 7a666f7
Author: Gwen Shapira csh...@gmail.com
Authored: Tue Aug 4 19:04:58 2015 -0700
Committer: Gwen Shapira csh...@gmail.com
Committed: Tue Aug 4 19:04:58 2015 -0700

--
 .../kafka/common/config/AbstractConfig.java |  23 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   2 +-
 .../scala/unit/kafka/log/LogConfigTest.scala|  22 -
 .../kafka/server/KafkaConfigConfigDefTest.scala | 403 ---
 .../unit/kafka/server/KafkaConfigTest.scala | 154 ++-
 5 files changed, 175 insertions(+), 429 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index ec3ae15..6c31748 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -39,7 +39,7 @@ public class AbstractConfig {
 private final MapString, Object values;
 
 @SuppressWarnings(unchecked)
-public AbstractConfig(ConfigDef definition, Map?, ? originals) {
+public AbstractConfig(ConfigDef definition, Map?, ? originals, Boolean 
doLog) {
 /* check that all the keys are really strings */
 for (Object key : originals.keySet())
 if (!(key instanceof String))
@@ -47,7 +47,12 @@ public class AbstractConfig {
 this.originals = (MapString, ?) originals;
 this.values = definition.parse(this.originals);
 this.used = Collections.synchronizedSet(new HashSetString());
-logAll();
+if (doLog)
+logAll();
+}
+
+public AbstractConfig(ConfigDef definition, Map?, ? originals) {
+this(definition, originals, true);
 }
 
 protected Object get(String key) {
@@ -167,4 +172,18 @@ public class AbstractConfig {
 return objects;
 }
 
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+AbstractConfig that = (AbstractConfig) o;
+
+return originals.equals(that.originals);
+}
+
+@Override
+public int hashCode() {
+return originals.hashCode();
+}
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/main/scala/kafka/log/LogConfig.scala
--
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index fc41132..c969d16 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,7 +46,7 @@ object Defaults {
   val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
 }
 
-case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props) {
+case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props, false) {
 
   val segmentSize = getInt(LogConfig.SegmentBytesProp)
   val segmentMs = getLong(LogConfig.SegmentMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 19dcb47..72e98b3 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -33,28 +33,6 @@ class LogConfigTest extends JUnit3Suite {
   }
 
   @Test
-  def testFromPropsToProps() {
-import scala.util.Random._
-val expected = new Properties()
-LogConfig.configNames().foreach((name) = {
-  name match {
-case LogConfig.UncleanLeaderElectionEnableProp = 
expected.setProperty(name, randFrom(true, false))
-case LogConfig.CompressionTypeProp = expected.setProperty(name, 
randFrom(producer, uncompressed, gzip))
-case LogConfig.CleanupPolicyProp = expected.setProperty(name, 
randFrom(LogConfig.Compact, LogConfig.Delete))
-