kafka git commit: KAFKA-2626: Handle null keys and value validation properly in OffsetStorageWriter.

2015-10-23 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e6177359 -> 6f2f1f984


KAFKA-2626: Handle null keys and value validation properly in 
OffsetStorageWriter.

Author: Ewen Cheslack-Postava 

Reviewers: Gwen Shapira

Closes #345 from ewencp/kafka-2626-offset-storage-writer-null-values


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f2f1f98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f2f1f98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f2f1f98

Branch: refs/heads/trunk
Commit: 6f2f1f9843f537b9bda3aa3951a867fdee661761
Parents: 2e61773
Author: Ewen Cheslack-Postava 
Authored: Fri Oct 23 17:01:33 2015 -0700
Committer: Gwen Shapira 
Committed: Fri Oct 23 17:01:33 2015 -0700

--
 .../kafka/copycat/storage/OffsetUtils.java  |  5 ++
 .../storage/OffsetStorageWriterTest.java| 71 +++-
 2 files changed, 59 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
--
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
index 8d78a57..9ba7662 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -34,6 +34,11 @@ public class OffsetUtils {
 }
 
 public static  void validateFormat(Map offsetData) {
+// Both keys and values for offsets may be null. For values, this is a 
useful way to delete offsets or indicate
+// that there's not usable concept of offsets in your source system.
+if (offsetData == null)
+return;
+
 for (Map.Entry entry : offsetData.entrySet()) {
 if (!(entry.getKey() instanceof String))
 throw new DataException("Offsets may only use String keys");

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
--
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index e33ecd0..3dd0b52 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -45,15 +45,11 @@ public class OffsetStorageWriterTest {
 private static final String NAMESPACE = "namespace";
 // Copycat format - any types should be accepted here
 private static final Map OFFSET_KEY = 
Collections.singletonMap("key", "key");
-private static final List OFFSET_KEY_WRAPPED = 
Arrays.asList(NAMESPACE, OFFSET_KEY);
 private static final Map OFFSET_VALUE = 
Collections.singletonMap("key", 12);
 
 // Serialized
 private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
 private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
-private static final Map OFFSETS_SERIALIZED
-= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
-ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
 
 @Mock private OffsetBackingStore store;
 @Mock private Converter keyConverter;
@@ -79,7 +75,7 @@ public class OffsetStorageWriterTest {
 public void testWriteFlush() throws Exception {
 @SuppressWarnings("unchecked")
 Callback callback = PowerMock.createMock(Callback.class);
-expectStore(callback, false);
+expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
 
 PowerMock.replayAll();
 
@@ -91,6 +87,41 @@ public class OffsetStorageWriterTest {
 PowerMock.verifyAll();
 }
 
+// It should be possible to set offset values to null
+@Test
+public void testWriteNullValueFlush() throws Exception {
+@SuppressWarnings("unchecked")
+Callback callback = PowerMock.createMock(Callback.class);
+expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, 
false, null);
+
+PowerMock.replayAll();
+
+writer.offset(OFFSET_KEY, null);
+
+assertTrue(writer.beginFlush());
+writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+

[3/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

2015-10-23 Thread gwenshap
KAFKA-2371: Add distributed support for Copycat.

This adds coordination between DistributedHerders using the generalized consumer
support, allowing automatic balancing of connectors and tasks across workers. A
few pieces that require interaction between workers (resolving config
inconsistencies, forwarding of configuration changes to the leader worker) are
incomplete because they require REST API support to implement properly.

Author: Ewen Cheslack-Postava 

Reviewers: Jason Gustafson, Gwen Shapira

Closes #321 from ewencp/kafka-2371-distributed-herder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735

Branch: refs/heads/trunk
Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461
Parents: 21443f2
Author: Ewen Cheslack-Postava 
Authored: Fri Oct 23 16:37:30 2015 -0700
Committer: Gwen Shapira 
Committed: Fri Oct 23 16:37:30 2015 -0700

--
 build.gradle|   1 +
 checkstyle/import-control.xml   |   1 +
 .../clients/consumer/RoundRobinAssignor.java|  35 +-
 .../consumer/internals/AbstractCoordinator.java |   8 +-
 .../kafka/common/utils/CircularIterator.java|  54 ++
 config/copycat-distributed.properties   |   2 +
 .../kafka/copycat/file/FileStreamSinkTask.java  |  12 +-
 .../copycat/file/FileStreamSourceTask.java  |  17 +-
 .../kafka/copycat/cli/CopycatDistributed.java   |   7 +-
 .../kafka/copycat/runtime/ConnectorConfig.java  |   2 +-
 .../kafka/copycat/runtime/TaskConfig.java   |  54 ++
 .../apache/kafka/copycat/runtime/Worker.java| 145 +++-
 .../runtime/distributed/ClusterConfigState.java |  40 +-
 .../runtime/distributed/CopycatProtocol.java| 246 +++
 .../runtime/distributed/DistributedHerder.java  | 733 +--
 .../distributed/DistributedHerderConfig.java| 192 +
 .../runtime/distributed/NotLeaderException.java |  38 +
 .../runtime/distributed/WorkerCoordinator.java  | 288 
 .../runtime/distributed/WorkerGroupMember.java  | 184 +
 .../distributed/WorkerRebalanceListener.java|  38 +
 .../runtime/standalone/StandaloneHerder.java| 168 ++---
 .../copycat/storage/KafkaConfigStorage.java |  64 +-
 .../storage/KafkaOffsetBackingStore.java|   2 +
 .../kafka/copycat/util/ConnectorTaskId.java |  10 +-
 .../kafka/copycat/runtime/WorkerTest.java   | 199 -
 .../distributed/DistributedHerderTest.java  | 436 ++-
 .../distributed/WorkerCoordinatorTest.java  | 436 +++
 .../standalone/StandaloneHerderTest.java|  45 +-
 .../copycat/storage/KafkaConfigStorageTest.java |  49 +-
 .../apache/kafka/copycat/util/TestFuture.java   |  10 +-
 tests/kafkatest/services/copycat.py |  67 +-
 .../kafkatest/tests/copycat_distributed_test.py |  67 +-
 tests/kafkatest/tests/copycat_test.py   |   5 +-
 .../templates/copycat-distributed.properties|   7 +-
 34 files changed, 2966 insertions(+), 696 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 16fb981..128c195 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':copycat:runtime') {
 testCompile "$easymock"
 testCompile "$powermock"
 testCompile "$powermock_easymock"
+testCompile project(':clients').sourceSets.test.output
 testRuntime "$slf4jlog4j"
 testRuntime project(":copycat:json")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml
--
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6474865..e1ea93c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -146,6 +146,7 @@
 
 
 
+
 
 
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index c5ea2bb..b8dc253 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer;
 
 import 

[1/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

2015-10-23 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 21443f214 -> 2e6177359


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
--
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 0463b85..1213656 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -17,20 +17,19 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceConnector;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.FutureCallback;
+import org.apache.kafka.copycat.util.TestFuture;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,251 +38,354 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({DistributedHerder.class})
+@PrepareForTest(DistributedHerder.class)
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
-private static final List CONNECTOR_NAMES = 
Arrays.asList("source-test1", "source-test2", "sink-test3");
-private static final List SOURCE_CONNECTOR_NAMES = 
Arrays.asList("source-test1", "source-test2");
-private static final List SINK_CONNECTOR_NAMES = 
Arrays.asList("sink-test3");
-private static final String TOPICS_LIST_STR = "topic1,topic2";
+private static final Map HERDER_CONFIG = new HashMap<>();
+static {
+HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, 
"config-topic");
+HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, 
"test-copycat-group");
+}
 
-private static final Map CONFIG_STORAGE_CONFIG = 
Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, 
"config-topic");
+private static final String CONN1 = "sourceA";
+private static final String CONN2 = "sourceA";
+private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+private static final Integer MAX_TASKS = 3;
+private static final Map CONNECTOR_CONFIG = new 
HashMap<>();
+static {
+CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA");
+CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
+CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
+}
+private static final Map TASK_CONFIG = new HashMap<>();
+static {
+TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+}
+private static final HashMap> 
TASK_CONFIGS = new HashMap<>();
+static {
+TASK_CONFIGS.put(TASK0, TASK_CONFIG);
+TASK_CONFIGS.put(TASK1, TASK_CONFIG);
+TASK_CONFIGS.put(TASK2, TASK_CONFIG);
+}
+private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, 

[2/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

2015-10-23 Thread gwenshap
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
--
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
new file mode 100644
index 000..bd2ba56
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedHerderConfig extends AbstractConfig {
+private static final ConfigDef CONFIG;
+
+/*
+ * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES 
AS
+ * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+ */
+
+/**
+ * group.id
+ */
+public static final String GROUP_ID_CONFIG = "group.id";
+private static final String GROUP_ID_DOC = "A unique string that 
identifies the Copycat cluster group this worker belongs to.";
+
+/**
+ * session.timeout.ms
+ */
+public static final String SESSION_TIMEOUT_MS_CONFIG = 
"session.timeout.ms";
+private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to 
detect failures when using Kafka's group management facilities.";
+
+/**
+ * heartbeat.interval.ms
+ */
+public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
"heartbeat.interval.ms";
+private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the group coordinator when using Kafka's group management 
facilities. Heartbeats are used to ensure that the worker's session stays 
active and to facilitate rebalancing when new members join or leave the group. 
The value must be set lower than session.timeout.ms, but typically 
should be set no higher than 1/3 of that value. It can be adjusted even lower 
to control the expected time for normal rebalances.";
+
+/**
+ * worker.sync.timeout.ms
+ */
+public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = 
"worker.sync.timeout.ms";
+private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker 
is out of sync with other workers and needs" +
+" to resynchronize configurations, wait up to this amount of time 
before giving up, leaving the group, and" +
+" waiting a backoff period before rejoining.";
+
+/**
+ * group.unsync.timeout.ms
+ */
+public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = 
"worker.unsync.backoff.ms";
+private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the 
worker is out of sync with other workers and " +
+" fails to catch up within worker.sync.timeout.ms, leave the 
Copycat cluster for this long before rejoining.";
+public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+static {
+CONFIG = new ConfigDef()
+.define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ConfigDef.Type.LIST,
+ConfigDef.Importance.HIGH,
+CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+.define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+.define(SESSION_TIMEOUT_MS_CONFIG,
+ConfigDef.Type.INT,
+3,
+ConfigDef.Importance.HIGH,
+SESSION_TIMEOUT_MS_DOC)
+.define(HEARTBEAT_INTERVAL_MS_CONFIG,
+ConfigDef.Type.INT,
+   

kafka git commit: KAFKA-2641; Upgrade path for ZK authentication

2015-10-23 Thread junrao
Repository: kafka
Updated Branches:
  refs/heads/trunk 701c46b3a -> 21443f214


KAFKA-2641; Upgrade path for ZK authentication

This pull request adds a configuration parameter and a migration tool. It is 
also based on pull request #303, which should go in first.

Author: flavio junqueira 
Author: Flavio Junqueira 
Author: Ismael Juma 

Reviewers: Ismael Juma , Jun Rao 

Closes #313 from fpj/KAFKA-2641


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21443f21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21443f21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21443f21

Branch: refs/heads/trunk
Commit: 21443f214fc6f1f51037e27f8ece155cf1eb288c
Parents: 701c46b
Author: flavio junqueira 
Authored: Fri Oct 23 15:11:07 2015 -0700
Committer: Jun Rao 
Committed: Fri Oct 23 15:11:07 2015 -0700

--
 bin/zookeeper-security-migration.sh |  17 ++
 .../apache/kafka/common/security/JaasUtils.java |   5 +
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  | 237 +++
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +
 .../main/scala/kafka/server/KafkaServer.scala   |   9 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  15 +-
 .../test/scala/other/kafka/DeleteZKPath.scala   |   1 -
 .../unit/kafka/server/KafkaConfigTest.scala |   1 +
 9 files changed, 285 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/bin/zookeeper-security-migration.sh
--
diff --git a/bin/zookeeper-security-migration.sh 
b/bin/zookeeper-security-migration.sh
new file mode 100755
index 000..65fce85
--- /dev/null
+++ b/bin/zookeeper-security-migration.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java 
b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index b8c870d..dade986 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -109,6 +109,11 @@ public class JaasUtils {
 throw new KafkaException("Exception while determining if 
ZooKeeper is secure");
 }
 }
+/*
+ * Tests fail if we don't reset the login configuration. It is unclear
+ * what is actually triggering this bug.
+ */
+Configuration.setConfiguration(null);
 
 return isSecurityEnabled;
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/AdminUtils.scala
--
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6fff176..64527de 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -226,7 +226,7 @@ object AdminUtils extends Logging {
 
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean = 
 zkUtils.zkClient.exists(getTopicPath(topic))
-
+
   def createTopic(zkUtils: ZkUtils,
   topic: String,
   partitions: Int, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
--
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
new