Repository: kafka Updated Branches: refs/heads/trunk 701c46b3a -> 21443f214
KAFKA-2641; Upgrade path for ZK authentication This pull request adds a configuration parameter and a migration tool. It is also based on pull request #303, which should go in first. Author: flavio junqueira <f...@apache.org> Author: Flavio Junqueira <f...@apache.org> Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #313 from fpj/KAFKA-2641 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21443f21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21443f21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21443f21 Branch: refs/heads/trunk Commit: 21443f214fc6f1f51037e27f8ece155cf1eb288c Parents: 701c46b Author: flavio junqueira <f...@apache.org> Authored: Fri Oct 23 15:11:07 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Oct 23 15:11:07 2015 -0700 ---------------------------------------------------------------------- bin/zookeeper-security-migration.sh | 17 ++ .../apache/kafka/common/security/JaasUtils.java | 5 + .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 237 +++++++++++++++++++ .../main/scala/kafka/server/KafkaConfig.scala | 5 + .../main/scala/kafka/server/KafkaServer.scala | 9 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 15 +- .../test/scala/other/kafka/DeleteZKPath.scala | 1 - .../unit/kafka/server/KafkaConfigTest.scala | 1 + 9 files changed, 285 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/bin/zookeeper-security-migration.sh ---------------------------------------------------------------------- diff --git a/bin/zookeeper-security-migration.sh b/bin/zookeeper-security-migration.sh new file mode 100755 index 0000000..65fce85 --- /dev/null +++ b/bin/zookeeper-security-migration.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java index b8c870d..dade986 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java @@ -109,6 +109,11 @@ public class JaasUtils { throw new KafkaException("Exception while determining if ZooKeeper is secure"); } } + /* + * Tests fail if we don't reset the login configuration. It is unclear + * what is actually triggering this bug. + */ + Configuration.setConfiguration(null); return isSecurityEnabled; } http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 6fff176..64527de 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -226,7 +226,7 @@ object AdminUtils extends Logging { def topicExists(zkUtils: ZkUtils, topic: String): Boolean = zkUtils.zkClient.exists(getTopicPath(topic)) - + def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala new file mode 100644 index 0000000..fce5c03 --- /dev/null +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import joptsimple.OptionParser +import org.I0Itec.zkclient.exception.ZkException +import kafka.utils.{Logging, ZkUtils, CommandLineUtils} +import org.apache.log4j.Level +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.Code +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection._ +import scala.collection.mutable.Queue +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * This tool is to be used when making access to ZooKeeper authenticated or + * the other way around, when removing authenticated access. The exact steps + * to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper + * access are the following: + * + * 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false + * and passing a valid JAAS login file via the system property + * java.security.auth.login.config + * 2- Perform a second rolling upgrade keeping the system property for the login file + * and now setting zookeeper.set.acl to true + * 3- Finally run this tool. There is a script under ./bin. Run + * ./bin/zookeeper-security-migration --help + * to see the configuration parameters. An example of running it is the following: + * ./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181 + * + * To convert a cluster from secure to unsecure, we need to perform the following + * steps: + * 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server + * 2- Run this migration tool, setting zookeeper.acl to unsecure + * 3- Perform another rolling upgrade to remove the system property setting the + * login file (java.security.auth.login.config). + */ + +object ZkSecurityMigrator extends Logging { + val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of " + + "znodes as part of the process of setting up ZooKeeper " + + "authentication.") + + def run(args: Array[String]) { + var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + val parser = new OptionParser() + + val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." + + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String]) + val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String]) + val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " + + "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181"). + ofType(classOf[String]) + val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout."). + withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) + val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout."). + withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) + val helpOpt = parser.accepts("help", "Print usage information.") + + val options = parser.parse(args : _*) + if (options.has(helpOpt)) + CommandLineUtils.printUsageAndDie(parser, usageMessage) + + if ((jaasFile == null) && !options.has(jaasFileOpt)) { + val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set either " + + "the system property %s or the option %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "--jaas.file")) + System.out.println("ERROR: %s".format(errorMsg)) + throw new IllegalArgumentException("Incorrect configuration") + } + + if (jaasFile == null) { + jaasFile = options.valueOf(jaasFileOpt) + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile) + } + + if (!JaasUtils.isZkSecurityEnabled(jaasFile)) { + val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile) + System.out.println("ERROR: %s".format(errorMsg)) + throw new IllegalArgumentException("Incorrect configuration") + } + + val zkAcl: Boolean = options.valueOf(zkAclOpt) match { + case "secure" => + info("zookeeper.acl option is secure") + true + case "unsecure" => + info("zookeeper.acl option is unsecure") + false + case _ => + CommandLineUtils.printUsageAndDie(parser, usageMessage) + } + val zkUrl = options.valueOf(zkUrlOpt) + val zkSessionTimeout = options.valueOf(zkSessionTimeoutOpt).intValue + val zkConnectionTimeout = options.valueOf(zkConnectionTimeoutOpt).intValue + val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl) + val migrator = new ZkSecurityMigrator(zkUtils) + migrator.run() + } + + def main(args: Array[String]) { + try { + run(args) + } catch { + case e: Exception => + e.printStackTrace() + } + } +} + +class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { + private val workQueue = new LinkedBlockingQueue[Runnable] + private val futures = new Queue[Future[String]] + + private def setAclsRecursively(path: String) = { + info("Setting ACL for path %s".format(path)) + val setPromise = Promise[String] + val childrenPromise = Promise[String] + futures.synchronized { + futures += setPromise.future + futures += childrenPromise.future + } + zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise) + zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) + } + + private object GetChildrenCallback extends ChildrenCallback { + def processResult(rc: Int, + path: String, + ctx: Object, + children: java.util.List[String]) { + val zkHandle = zkUtils.zkConnection.getZookeeper + val promise = ctx.asInstanceOf[Promise[String]] + Code.get(rc) match { + case Code.OK => + // Set ACL for each child + for (child <- children.asScala) + setAclsRecursively(s"$path/$child") + promise success "done" + case Code.CONNECTIONLOSS => + zkHandle.getChildren(path, false, GetChildrenCallback, ctx) + case Code.NONODE => + warn("Node is gone, it could be have been legitimately deleted: %s".format(path)) + promise success "done" + case Code.SESSIONEXPIRED => + // Starting a new session isn't really a problem, but it'd complicate + // the logic of the tool, so we quit and let the user re-run it. + System.out.println("ZooKeeper session expired while changing ACLs") + promise failure ZkException.create(KeeperException.create(Code.get(rc))) + case _ => + System.out.println("Unexpected return code: %d".format(rc)) + promise failure ZkException.create(KeeperException.create(Code.get(rc))) + } + } + } + + private object SetACLCallback extends StatCallback { + def processResult(rc: Int, + path: String, + ctx: Object, + stat: Stat) { + val zkHandle = zkUtils.zkConnection.getZookeeper + val promise = ctx.asInstanceOf[Promise[String]] + + Code.get(rc) match { + case Code.OK => + info("Successfully set ACLs for %s".format(path)) + promise success "done" + case Code.CONNECTIONLOSS => + zkHandle.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, ctx) + case Code.NONODE => + warn("Znode is gone, it could be have been legitimately deleted: %s".format(path)) + promise success "done" + case Code.SESSIONEXPIRED => + // Starting a new session isn't really a problem, but it'd complicate + // the logic of the tool, so we quit and let the user re-run it. + System.out.println("ZooKeeper session expired while changing ACLs") + promise failure ZkException.create(KeeperException.create(Code.get(rc))) + case _ => + System.out.println("Unexpected return code: %d".format(rc)) + promise failure ZkException.create(KeeperException.create(Code.get(rc))) + } + } + } + + private def run(): Unit = { + try { + for (path <- zkUtils.securePersistentZkPaths) { + debug("Going to set ACL for %s".format(path)) + zkUtils.makeSurePersistentPathExists(path) + setAclsRecursively(path) + } + + @tailrec + def recurse(): Unit = { + val future = futures.synchronized { + futures.headOption + } + future match { + case Some(a) => + Await.result(a, 6000 millis) + futures.synchronized { futures.dequeue } + recurse + case None => + } + } + recurse() + + } finally { + zkUtils.close + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 52182b8..d52b5c0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -39,6 +39,7 @@ object Defaults { /** ********* Zookeeper Configuration ***********/ val ZkSessionTimeoutMs = 6000 val ZkSyncTimeMs = 2000 + val ZkEnableSecureAcls = false /** ********* General Configuration ***********/ val MaxReservedBrokerId = 1000 @@ -187,6 +188,7 @@ object KafkaConfig { val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms" val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms" val ZkSyncTimeMsProp = "zookeeper.sync.time.ms" + val ZkEnableSecureAclsProp = "zookeeper.set.acl" /** ********* General Configuration ***********/ val MaxReservedBrokerIdProp = "reserved.broker.max.id" val BrokerIdProp = "broker.id" @@ -334,6 +336,7 @@ object KafkaConfig { val ZkSessionTimeoutMsDoc = "Zookeeper session timeout" val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper" val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader" + val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" /** ********* General Configuration ***********/ val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" val BrokerIdDoc = "The broker id for this server. " + @@ -515,6 +518,7 @@ object KafkaConfig { .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc) .define(ZkConnectionTimeoutMsProp, INT, HIGH, ZkConnectionTimeoutMsDoc, false) .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc) + .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc) /** ********* General Configuration ***********/ .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM, MaxReservedBrokerIdProp) @@ -702,6 +706,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val zkConnectionTimeoutMs: Int = Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp)) val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp) + val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp) /** ********* General Configuration ***********/ val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 617f807..c6ea84e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -260,12 +260,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr "" } + val secureAclsEnabled = JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) && config.zkEnableSecureAcls + + if(config.zkEnableSecureAcls && !secureAclsEnabled) { + throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.") + } if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) + secureAclsEnabled) zkClientForChrootCreation.makeSurePersistentPathExists(chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.zkClient.close() @@ -274,7 +279,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) + secureAclsEnabled) zkUtils.setupCommonPaths() zkUtils } http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 17e63e2..a39e61c 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -112,7 +112,7 @@ object ZkUtils { * Get calls that only depend on static paths */ def getTopicPath(topic: String): String = { - BrokerTopicsPath + "/" + topic + ZkUtils.BrokerTopicsPath + "/" + topic } def getTopicPartitionsPath(topic: String): String = { @@ -126,7 +126,7 @@ object ZkUtils { getTopicPartitionPath(topic, partitionId) + "/" + "state" def getEntityConfigRootPath(entityType: String): String = - EntityConfigPath + "/" + entityType + ZkUtils.EntityConfigPath + "/" + entityType def getEntityConfigPath(entityType: String, entity: String): String = getEntityConfigRootPath(entityType) + "/" + entity @@ -149,6 +149,15 @@ class ZkUtils(val zkClient: ZkClient, BrokerSequenceIdPath, IsrChangeNotificationPath) + val securePersistentZkPaths = Seq(BrokerIdsPath, + BrokerTopicsPath, + EntityConfigChangesPath, + getEntityConfigRootPath(ConfigType.Topic), + getEntityConfigRootPath(ConfigType.Client), + DeleteTopicsPath, + BrokerSequenceIdPath, + IsrChangeNotificationPath) + val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure) def getController(): Int = { @@ -713,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient, def deletePartition(brokerId: Int, topic: String) { val brokerIdPath = BrokerIdsPath + "/" + brokerId zkClient.delete(brokerIdPath) - val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId + val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId zkClient.delete(brokerPartTopicPath) } http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/test/scala/other/kafka/DeleteZKPath.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala index 92bde88..202bf43 100755 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -31,7 +31,6 @@ object DeleteZKPath { val config = new ConsumerConfig(Utils.loadProps(args(0))) val zkPath = args(1) - val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b1a7f21..4059dc2 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -398,6 +398,7 @@ class KafkaConfigTest { case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")