kafka git commit: kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao
Repository: kafka Updated Branches: refs/heads/trunk 1a0179f21 - a56a79055 kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a56a7905 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a56a7905 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a56a7905 Branch: refs/heads/trunk Commit: a56a79055dfba4687f476b0a4d20aeec1c4ebff7 Parents: 1a0179f Author: Aditya Auradkar aaurad...@linkedin.com Authored: Tue Aug 4 15:11:27 2015 -0700 Committer: Jun Rao jun...@gmail.com Committed: Tue Aug 4 15:11:27 2015 -0700 -- bin/kafka-configs.sh| 17 ++ .../src/main/scala/kafka/admin/AdminUtils.scala | 77 +--- .../main/scala/kafka/admin/ConfigCommand.scala | 174 ++ .../main/scala/kafka/admin/TopicCommand.scala | 39 +--- .../main/scala/kafka/cluster/Partition.scala| 5 +- .../kafka/controller/KafkaController.scala | 7 +- .../controller/PartitionLeaderSelector.scala| 6 +- .../kafka/controller/TopicDeletionManager.scala | 5 +- .../main/scala/kafka/server/ConfigHandler.scala | 69 +++ .../kafka/server/DynamicConfigManager.scala | 183 +++ .../main/scala/kafka/server/KafkaServer.scala | 15 +- .../kafka/server/ReplicaFetcherThread.scala | 4 +- .../scala/kafka/server/TopicConfigManager.scala | 152 --- core/src/main/scala/kafka/utils/ZkUtils.scala | 33 +++- .../test/scala/unit/kafka/admin/AdminTest.scala | 8 +- .../unit/kafka/admin/ConfigCommandTest.scala| 73 .../unit/kafka/admin/TopicCommandTest.scala | 13 +- .../kafka/server/DynamicConfigChangeTest.scala | 83 - 18 files changed, 718 insertions(+), 245 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/bin/kafka-configs.sh -- diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh new file mode 100755 index 000..417eaf5 --- /dev/null +++ b/bin/kafka-configs.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/AdminUtils.scala -- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 4cc2376..990 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -21,6 +21,7 @@ import kafka.common._ import kafka.cluster.{BrokerEndPoint, Broker} import kafka.log.LogConfig +import kafka.server.ConfigType import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} @@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException object AdminUtils extends Logging { val rand = new Random - val AdminClientId = __admin_client - - val TopicConfigChangeZnodePrefix = config_change_ + val EntityConfigChangeZnodePrefix = config_change_ /** * There are 2 goals of replica assignment: @@ -103,14 +102,12 @@ object AdminUtils extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignmentStr Manual replica assignment * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing - * @param config Pre-existing properties that should be preserved */ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = , -checkBrokerAvailable: Boolean = true, -config: Properties = new Properties) { +checkBrokerAvailable: Boolean = true) { val existingPartitionsReplicaList =
kafka git commit: KAFKA-2405; Don't kill the JVM on session establishment failure
Repository: kafka Updated Branches: refs/heads/trunk a56a79055 - 7a666f7aa KAFKA-2405; Don't kill the JVM on session establishment failure As noted in the JIRA https://issues.apache.org/jira/browse/KAFKA-2405 currently the KafkaHealthCheck causes the JVM to terminate in cases where session establishment with Zookeeper fails. I don't know if retrying (after a while) is a better way to fix this but at least, IMO, the session establishment failure shouldn't kill the JVM. This commit removes the `System.exit()` call. Author: Jaikiran Pai jaikiran@gmail.com Reviewers: Gwen Shapira csh...@gmail.com Closes #111 from jaikiran/kafka-2405 and squashes the following commits: 0255fdb [Jaikiran Pai] KAFKA-2405 Don't kill the JVM on session establishment failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a666f7a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a666f7a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a666f7a Branch: refs/heads/trunk Commit: 7a666f7aa8b1be927579817187e0b5b93543b5e2 Parents: a56a790 Author: Jaikiran Pai jaikiran@gmail.com Authored: Tue Aug 4 17:10:02 2015 -0700 Committer: Chen Shapira g...@chens-mbp.gateway.sonic.net Committed: Tue Aug 4 17:10:02 2015 -0700 -- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/server/KafkaHealthcheck.scala| 1 - 2 files changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/7a666f7a/core/src/main/scala/kafka/controller/KafkaController.scala -- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 6844602..b19e57f 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1138,7 +1138,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } override def handleSessionEstablishmentError(error: Throwable): Unit = { - //no-op handleSessionEstablishmentError in KafkaHealthCheck should System.exit and log the error. + //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a666f7a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index ea0c996..e6e270b 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -92,7 +92,6 @@ class KafkaHealthcheck(private val brokerId: Int, override def handleSessionEstablishmentError(error: Throwable): Unit = { fatal(Could not establish session with zookeeper, error) - System.exit(-1) } }
kafka git commit: KAFKA-2055; Fix transient ConsumerBounceTest.testSeekAndCommitWithBrokerFailure failure.
Repository: kafka Updated Branches: refs/heads/trunk 3c0963084 - 4b400afce KAFKA-2055; Fix transient ConsumerBounceTest.testSeekAndCommitWithBrokerFailure failure. â¦kerFailures failure; Author: lvfangmin lvfang...@gmail.com Reviewers: Guozhang Closes #98 from lvfangmin/KAFKA-2055 and squashes the following commits: 057a1f1 [lvfangmin] KAFKA-2055; Fix transient ConsumerBounceTest.testSeekAndCommitWithBrokerFailures failure; Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b400afc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b400afc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b400afc Branch: refs/heads/trunk Commit: 4b400afceadcbe74e3ecbfeb7c3066fd436da192 Parents: 3c09630 Author: lvfangmin lvfang...@gmail.com Authored: Tue Aug 4 11:29:10 2015 -0700 Committer: Guozhang Wang wangg...@gmail.com Committed: Tue Aug 4 11:29:10 2015 -0700 -- .../test/scala/integration/kafka/api/ConsumerBounceTest.scala | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/4b400afc/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala -- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index d8eee52..93f9468 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -104,6 +104,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { consumer.subscribe(tp) consumer.seek(tp, 0) +// wait until all the followers have synced the last HW with leader +TestUtils.waitUntilTrue(() = servers.forall(server = + server.replicaManager.getReplica(tp.topic(), tp.partition()).get.highWatermark.messageOffset == numRecords +), Failed to update high watermark for followers after timeout) + val scheduler = new BounceBrokerScheduler(numIters) scheduler.start()
kafka git commit: HOTFIX; Encode/decode to utf-8 for commit title IO in kafka-merge-pr.py after KAFKA-2384
Repository: kafka Updated Branches: refs/heads/trunk 4b400afce - 1a0179f21 HOTFIX; Encode/decode to utf-8 for commit title IO in kafka-merge-pr.py after KAFKA-2384 This fix should be fine for Linux and OS X. Not sure about Windows though. This is a very specific fix for new functionality added in KAFKA-2384. There are other places where a similar error could occur, but are less likely. The script doesn't really support Unicode input at the moment. Author: Ismael Juma ism...@juma.me.uk Reviewers: Guozhang Closes #109 from ijuma/kafka-2384-hotfix and squashes the following commits: 0ab8958 [Ismael Juma] Encode/decode to utf-8 for commit title IO in kafka-merge-pr.py Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a0179f2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a0179f2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a0179f2 Branch: refs/heads/trunk Commit: 1a0179f21a3937065fea2b745de67967bcafcb68 Parents: 4b400af Author: Ismael Juma ism...@juma.me.uk Authored: Tue Aug 4 11:33:03 2015 -0700 Committer: Guozhang Wang wangg...@gmail.com Committed: Tue Aug 4 11:33:03 2015 -0700 -- kafka-merge-pr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0179f2/kafka-merge-pr.py -- diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index 576d315..5e8cbf5 100644 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -381,7 +381,7 @@ def main(): url = pr[url] pr_title = pr[title] -commit_title = raw_input(Commit title [%s]: % pr_title) +commit_title = raw_input(Commit title [%s]: % pr_title.encode(utf-8)).decode(utf-8) if commit_title == : commit_title = pr_title
kafka git commit: KAFKA-2288; Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by Jun Rao
Repository: kafka Updated Branches: refs/heads/trunk 7a666f7aa - 9cefb2a0f KAFKA-2288; Follow-up to KAFKA-2249 - reduce logging and testing; Reviewd by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cefb2a0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cefb2a0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cefb2a0 Branch: refs/heads/trunk Commit: 9cefb2a0fb7852d35cfe0f051bc6eadb8e9c4c80 Parents: 7a666f7 Author: Gwen Shapira csh...@gmail.com Authored: Tue Aug 4 19:04:58 2015 -0700 Committer: Gwen Shapira csh...@gmail.com Committed: Tue Aug 4 19:04:58 2015 -0700 -- .../kafka/common/config/AbstractConfig.java | 23 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- .../scala/unit/kafka/log/LogConfigTest.scala| 22 - .../kafka/server/KafkaConfigConfigDefTest.scala | 403 --- .../unit/kafka/server/KafkaConfigTest.scala | 154 ++- 5 files changed, 175 insertions(+), 429 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index ec3ae15..6c31748 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -39,7 +39,7 @@ public class AbstractConfig { private final MapString, Object values; @SuppressWarnings(unchecked) -public AbstractConfig(ConfigDef definition, Map?, ? originals) { +public AbstractConfig(ConfigDef definition, Map?, ? originals, Boolean doLog) { /* check that all the keys are really strings */ for (Object key : originals.keySet()) if (!(key instanceof String)) @@ -47,7 +47,12 @@ public class AbstractConfig { this.originals = (MapString, ?) originals; this.values = definition.parse(this.originals); this.used = Collections.synchronizedSet(new HashSetString()); -logAll(); +if (doLog) +logAll(); +} + +public AbstractConfig(ConfigDef definition, Map?, ? originals) { +this(definition, originals, true); } protected Object get(String key) { @@ -167,4 +172,18 @@ public class AbstractConfig { return objects; } +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +AbstractConfig that = (AbstractConfig) o; + +return originals.equals(that.originals); +} + +@Override +public int hashCode() { +return originals.hashCode(); +} } http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/main/scala/kafka/log/LogConfig.scala -- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index fc41132..c969d16 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -46,7 +46,7 @@ object Defaults { val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable } -case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) { +case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/9cefb2a0/core/src/test/scala/unit/kafka/log/LogConfigTest.scala -- diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 19dcb47..72e98b3 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -33,28 +33,6 @@ class LogConfigTest extends JUnit3Suite { } @Test - def testFromPropsToProps() { -import scala.util.Random._ -val expected = new Properties() -LogConfig.configNames().foreach((name) = { - name match { -case LogConfig.UncleanLeaderElectionEnableProp = expected.setProperty(name, randFrom(true, false)) -case LogConfig.CompressionTypeProp = expected.setProperty(name, randFrom(producer, uncompressed, gzip)) -case LogConfig.CleanupPolicyProp = expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete)) -