Repository: kafka
Updated Branches:
  refs/heads/trunk 701c46b3a -> 21443f214


KAFKA-2641; Upgrade path for ZK authentication

This pull request adds a configuration parameter and a migration tool. It is 
also based on pull request #303, which should go in first.

Author: flavio junqueira <f...@apache.org>
Author: Flavio Junqueira <f...@apache.org>
Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #313 from fpj/KAFKA-2641


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21443f21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21443f21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21443f21

Branch: refs/heads/trunk
Commit: 21443f214fc6f1f51037e27f8ece155cf1eb288c
Parents: 701c46b
Author: flavio junqueira <f...@apache.org>
Authored: Fri Oct 23 15:11:07 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Fri Oct 23 15:11:07 2015 -0700

----------------------------------------------------------------------
 bin/zookeeper-security-migration.sh             |  17 ++
 .../apache/kafka/common/security/JaasUtils.java |   5 +
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  | 237 +++++++++++++++++++
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +
 .../main/scala/kafka/server/KafkaServer.scala   |   9 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  15 +-
 .../test/scala/other/kafka/DeleteZKPath.scala   |   1 -
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 9 files changed, 285 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/bin/zookeeper-security-migration.sh
----------------------------------------------------------------------
diff --git a/bin/zookeeper-security-migration.sh 
b/bin/zookeeper-security-migration.sh
new file mode 100755
index 0000000..65fce85
--- /dev/null
+++ b/bin/zookeeper-security-migration.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java 
b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index b8c870d..dade986 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -109,6 +109,11 @@ public class JaasUtils {
                 throw new KafkaException("Exception while determining if 
ZooKeeper is secure");
             }
         }
+        /*
+         * Tests fail if we don't reset the login configuration. It is unclear
+         * what is actually triggering this bug.
+         */
+        Configuration.setConfiguration(null);
 
         return isSecurityEnabled;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6fff176..64527de 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -226,7 +226,7 @@ object AdminUtils extends Logging {
 
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean = 
     zkUtils.zkClient.exists(getTopicPath(topic))
-    
+
   def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   partitions: Int, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
new file mode 100644
index 0000000..fce5c03
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.TimeUnit
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.exception.ZkException
+import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
+import org.apache.log4j.Level
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.Code
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection._
+import scala.collection.mutable.Queue
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * This tool is to be used when making access to ZooKeeper authenticated or 
+ * the other way around, when removing authenticated access. The exact steps
+ * to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper
+ * access are the following:
+ * 
+ * 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to 
false
+ * and passing a valid JAAS login file via the system property 
+ * java.security.auth.login.config
+ * 2- Perform a second rolling upgrade keeping the system property for the 
login file
+ * and now setting zookeeper.set.acl to true
+ * 3- Finally run this tool. There is a script under ./bin. Run 
+ *   ./bin/zookeeper-security-migration --help
+ * to see the configuration parameters. An example of running it is the 
following:
+ *  ./bin/zookeeper-security-migration --zookeeper.acl=secure 
--zookeeper.connection=localhost:2181
+ * 
+ * To convert a cluster from secure to unsecure, we need to perform the 
following
+ * steps:
+ * 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each 
server
+ * 2- Run this migration tool, setting zookeeper.acl to unsecure
+ * 3- Perform another rolling upgrade to remove the system property setting the
+ * login file (java.security.auth.login.config).
+ */
+
+object ZkSecurityMigrator extends Logging {
+  val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the 
ACLs of "
+                      + "znodes as part of the process of setting up ZooKeeper 
"
+                      + "authentication.")
+
+  def run(args: Array[String]) {
+    var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    val parser = new OptionParser()
+
+    val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make 
the Kafka znodes in ZooKeeper secure or unsecure."
+        + " The options are 'secure' and 
'unsecure'").withRequiredArg().ofType(classOf[String])
+    val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config 
file.").withOptionalArg().ofType(classOf[String])
+    val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper 
connect string (ensemble). This parameter " +
+      "takes a comma-separated list of host:port 
pairs.").withRequiredArg().defaultsTo("localhost:2181").
+      ofType(classOf[String])
+    val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", 
"Sets the ZooKeeper session timeout.").
+      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
+    val zkConnectionTimeoutOpt = 
parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection 
timeout.").
+      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
+    val helpOpt = parser.accepts("help", "Print usage information.")
+
+    val options = parser.parse(args : _*)
+    if (options.has(helpOpt))
+      CommandLineUtils.printUsageAndDie(parser, usageMessage)
+
+    if ((jaasFile == null) && !options.has(jaasFileOpt)) {
+     val errorMsg = ("No JAAS configuration file has been specified. Please 
make sure that you have set either " + 
+                    "the system property %s or the option 
%s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "--jaas.file")) 
+     System.out.println("ERROR: %s".format(errorMsg))
+     throw new IllegalArgumentException("Incorrect configuration")
+    }
+
+    if (jaasFile == null) {
+      jaasFile = options.valueOf(jaasFileOpt)
+      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+    }
+
+    if (!JaasUtils.isZkSecurityEnabled(jaasFile)) {
+      val errorMsg = "Security isn't enabled, most likely the file isn't set 
properly: %s".format(jaasFile)
+      System.out.println("ERROR: %s".format(errorMsg))
+      throw new IllegalArgumentException("Incorrect configuration") 
+    }
+
+    val zkAcl: Boolean = options.valueOf(zkAclOpt) match {
+      case "secure" =>
+        info("zookeeper.acl option is secure")
+        true
+      case "unsecure" =>
+        info("zookeeper.acl option is unsecure")
+        false
+      case _ =>
+        CommandLineUtils.printUsageAndDie(parser, usageMessage)
+    }
+    val zkUrl = options.valueOf(zkUrlOpt)
+    val zkSessionTimeout = options.valueOf(zkSessionTimeoutOpt).intValue
+    val zkConnectionTimeout = options.valueOf(zkConnectionTimeoutOpt).intValue
+    val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
+    val migrator = new ZkSecurityMigrator(zkUtils)
+    migrator.run()
+  }
+
+  def main(args: Array[String]) {
+    try {
+      run(args)
+    } catch {
+        case e: Exception =>
+          e.printStackTrace()
+    }
+  }
+}
+
+class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
+  private val workQueue = new LinkedBlockingQueue[Runnable]
+  private val futures = new Queue[Future[String]]
+
+  private def setAclsRecursively(path: String) = {
+    info("Setting ACL for path %s".format(path))
+    val setPromise = Promise[String]
+    val childrenPromise = Promise[String]
+    futures.synchronized {
+      futures += setPromise.future
+      futures += childrenPromise.future
+    }
+    zkUtils.zkConnection.getZookeeper.setACL(path, 
ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise)
+    zkUtils.zkConnection.getZookeeper.getChildren(path, false, 
GetChildrenCallback, childrenPromise)
+  }
+
+  private object GetChildrenCallback extends ChildrenCallback {
+    def processResult(rc: Int,
+                      path: String,
+                      ctx: Object,
+                      children: java.util.List[String]) {
+      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val promise = ctx.asInstanceOf[Promise[String]]
+      Code.get(rc) match {
+        case Code.OK =>
+          // Set ACL for each child
+          for (child <- children.asScala)
+            setAclsRecursively(s"$path/$child")
+          promise success "done"
+        case Code.CONNECTIONLOSS =>
+          zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
+        case Code.NONODE =>
+          warn("Node is gone, it could be have been legitimately deleted: 
%s".format(path))
+          promise success "done"
+        case Code.SESSIONEXPIRED =>
+          // Starting a new session isn't really a problem, but it'd complicate
+          // the logic of the tool, so we quit and let the user re-run it.
+          System.out.println("ZooKeeper session expired while changing ACLs")
+          promise failure 
ZkException.create(KeeperException.create(Code.get(rc)))
+        case _ =>
+          System.out.println("Unexpected return code: %d".format(rc))
+          promise failure 
ZkException.create(KeeperException.create(Code.get(rc)))
+      }
+    }
+  }
+
+  private object SetACLCallback extends StatCallback {
+    def processResult(rc: Int,
+                      path: String,
+                      ctx: Object,
+                      stat: Stat) {
+      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val promise = ctx.asInstanceOf[Promise[String]]
+
+      Code.get(rc) match {
+        case Code.OK =>
+          info("Successfully set ACLs for %s".format(path))
+          promise success "done"
+        case Code.CONNECTIONLOSS =>
+            zkHandle.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, 
SetACLCallback, ctx)
+        case Code.NONODE =>
+          warn("Znode is gone, it could be have been legitimately deleted: 
%s".format(path))
+          promise success "done"
+        case Code.SESSIONEXPIRED =>
+          // Starting a new session isn't really a problem, but it'd complicate
+          // the logic of the tool, so we quit and let the user re-run it.
+          System.out.println("ZooKeeper session expired while changing ACLs")
+          promise failure 
ZkException.create(KeeperException.create(Code.get(rc)))
+        case _ =>
+          System.out.println("Unexpected return code: %d".format(rc))
+          promise failure 
ZkException.create(KeeperException.create(Code.get(rc)))
+      }
+    }
+  }
+
+  private def run(): Unit = {
+    try {
+      for (path <- zkUtils.securePersistentZkPaths) {
+        debug("Going to set ACL for %s".format(path))
+        zkUtils.makeSurePersistentPathExists(path)
+        setAclsRecursively(path)
+      }
+
+      @tailrec
+      def recurse(): Unit = {
+        val future = futures.synchronized { 
+          futures.headOption
+        }
+        future match {
+          case Some(a) =>
+            Await.result(a, 6000 millis)
+            futures.synchronized { futures.dequeue }
+            recurse
+          case None =>
+        }
+      }
+      recurse()
+
+    } finally {
+      zkUtils.close
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 52182b8..d52b5c0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -39,6 +39,7 @@ object Defaults {
   /** ********* Zookeeper Configuration ***********/
   val ZkSessionTimeoutMs = 6000
   val ZkSyncTimeMs = 2000
+  val ZkEnableSecureAcls = false
 
   /** ********* General Configuration ***********/
   val MaxReservedBrokerId = 1000
@@ -187,6 +188,7 @@ object KafkaConfig {
   val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
   val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
   val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
+  val ZkEnableSecureAclsProp = "zookeeper.set.acl"
   /** ********* General Configuration ***********/
   val MaxReservedBrokerIdProp = "reserved.broker.max.id"
   val BrokerIdProp = "broker.id"
@@ -334,6 +336,7 @@ object KafkaConfig {
   val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
   val ZkConnectionTimeoutMsDoc = "The max time that the client waits to 
establish a connection to zookeeper"
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
+  val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. " +
@@ -515,6 +518,7 @@ object KafkaConfig {
       .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, 
ZkSessionTimeoutMsDoc)
       .define(ZkConnectionTimeoutMsProp, INT, HIGH, ZkConnectionTimeoutMsDoc, 
false)
       .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, 
ZkSyncTimeMsDoc)
+      .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, 
HIGH, ZkEnableSecureAclsDoc)
 
       /** ********* General Configuration ***********/
       .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, 
atLeast(0), MEDIUM, MaxReservedBrokerIdProp)
@@ -702,6 +706,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
   val zkConnectionTimeoutMs: Int =
     
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
   val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
+  val zkEnableSecureAcls: Boolean = 
getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
 
   /** ********* General Configuration ***********/
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 617f807..c6ea84e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -260,12 +260,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         ""
     }
 
+    val secureAclsEnabled = 
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
 && config.zkEnableSecureAcls
+    
+    if(config.zkEnableSecureAcls && !secureAclsEnabled) {
+      throw new java.lang.SecurityException("zkEnableSecureAcls is true, but 
the verification of the JAAS login file failed.")    
+    }
     if (chroot.length > 1) {
       val zkConnForChrootCreation = config.zkConnect.substring(0, 
config.zkConnect.indexOf("/"))
       val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, 
                                               config.zkSessionTimeoutMs,
                                               config.zkConnectionTimeoutMs,
-                                              
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                                              secureAclsEnabled)
       zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
       info("Created zookeeper path " + chroot)
       zkClientForChrootCreation.zkClient.close()
@@ -274,7 +279,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
     val zkUtils = ZkUtils(config.zkConnect,
                           config.zkSessionTimeoutMs,
                           config.zkConnectionTimeoutMs,
-                          
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                          secureAclsEnabled)
     zkUtils.setupCommonPaths()
     zkUtils
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 17e63e2..a39e61c 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -112,7 +112,7 @@ object ZkUtils {
    * Get calls that only depend on static paths
    */
   def getTopicPath(topic: String): String = {
-    BrokerTopicsPath + "/" + topic
+    ZkUtils.BrokerTopicsPath + "/" + topic
   }
 
   def getTopicPartitionsPath(topic: String): String = {
@@ -126,7 +126,7 @@ object ZkUtils {
     getTopicPartitionPath(topic, partitionId) + "/" + "state"
     
   def getEntityConfigRootPath(entityType: String): String =
-    EntityConfigPath + "/" + entityType
+    ZkUtils.EntityConfigPath + "/" + entityType
 
   def getEntityConfigPath(entityType: String, entity: String): String =
     getEntityConfigRootPath(entityType) + "/" + entity
@@ -149,6 +149,15 @@ class ZkUtils(val zkClient: ZkClient,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath)
 
+  val securePersistentZkPaths = Seq(BrokerIdsPath,
+                                    BrokerTopicsPath,
+                                    EntityConfigChangesPath,
+                                    getEntityConfigRootPath(ConfigType.Topic),
+                                    getEntityConfigRootPath(ConfigType.Client),
+                                    DeleteTopicsPath,
+                                    BrokerSequenceIdPath,
+                                    IsrChangeNotificationPath)
+
   val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
   
   def getController(): Int = {
@@ -713,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient,
   def deletePartition(brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)
-    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
+    val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + 
brokerId
     zkClient.delete(brokerPartTopicPath)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala 
b/core/src/test/scala/other/kafka/DeleteZKPath.scala
index 92bde88..202bf43 100755
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala
@@ -31,7 +31,6 @@ object DeleteZKPath {
 
     val config = new ConsumerConfig(Utils.loadProps(args(0)))
     val zkPath = args(1)
-
     val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, 
config.zkConnectionTimeoutMs, false)
 
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b1a7f21..4059dc2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -398,6 +398,7 @@ class KafkaConfigTest {
         case KafkaConfig.ZkSessionTimeoutMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ZkConnectionTimeoutMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ZkSyncTimeMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ZkEnableSecureAclsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
 
         case KafkaConfig.BrokerIdProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.NumNetworkThreadsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

Reply via email to