kafka git commit: KAFKA-2626: Handle null keys and value validation properly in OffsetStorageWriter.
Repository: kafka Updated Branches: refs/heads/trunk 2e6177359 -> 6f2f1f984 KAFKA-2626: Handle null keys and value validation properly in OffsetStorageWriter. Author: Ewen Cheslack-PostavaReviewers: Gwen Shapira Closes #345 from ewencp/kafka-2626-offset-storage-writer-null-values Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f2f1f98 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f2f1f98 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f2f1f98 Branch: refs/heads/trunk Commit: 6f2f1f9843f537b9bda3aa3951a867fdee661761 Parents: 2e61773 Author: Ewen Cheslack-Postava Authored: Fri Oct 23 17:01:33 2015 -0700 Committer: Gwen Shapira Committed: Fri Oct 23 17:01:33 2015 -0700 -- .../kafka/copycat/storage/OffsetUtils.java | 5 ++ .../storage/OffsetStorageWriterTest.java| 71 +++- 2 files changed, 59 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java -- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java index 8d78a57..9ba7662 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java @@ -34,6 +34,11 @@ public class OffsetUtils { } public static void validateFormat(Map offsetData) { +// Both keys and values for offsets may be null. For values, this is a useful way to delete offsets or indicate +// that there's not usable concept of offsets in your source system. +if (offsetData == null) +return; + for (Map.Entry entry : offsetData.entrySet()) { if (!(entry.getKey() instanceof String)) throw new DataException("Offsets may only use String keys"); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java -- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java index e33ecd0..3dd0b52 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -45,15 +45,11 @@ public class OffsetStorageWriterTest { private static final String NAMESPACE = "namespace"; // Copycat format - any types should be accepted here private static final Map OFFSET_KEY = Collections.singletonMap("key", "key"); -private static final List OFFSET_KEY_WRAPPED = Arrays.asList(NAMESPACE, OFFSET_KEY); private static final Map OFFSET_VALUE = Collections.singletonMap("key", 12); // Serialized private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); -private static final Map OFFSETS_SERIALIZED -= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED), -ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); @Mock private OffsetBackingStore store; @Mock private Converter keyConverter; @@ -79,7 +75,7 @@ public class OffsetStorageWriterTest { public void testWriteFlush() throws Exception { @SuppressWarnings("unchecked") Callback callback = PowerMock.createMock(Callback.class); -expectStore(callback, false); +expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); PowerMock.replayAll(); @@ -91,6 +87,41 @@ public class OffsetStorageWriterTest { PowerMock.verifyAll(); } +// It should be possible to set offset values to null +@Test +public void testWriteNullValueFlush() throws Exception { +@SuppressWarnings("unchecked") +Callback callback = PowerMock.createMock(Callback.class); +expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, false, null); + +PowerMock.replayAll(); + +writer.offset(OFFSET_KEY, null); + +assertTrue(writer.beginFlush()); +writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + +
[3/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.
KAFKA-2371: Add distributed support for Copycat. This adds coordination between DistributedHerders using the generalized consumer support, allowing automatic balancing of connectors and tasks across workers. A few pieces that require interaction between workers (resolving config inconsistencies, forwarding of configuration changes to the leader worker) are incomplete because they require REST API support to implement properly. Author: Ewen Cheslack-PostavaReviewers: Jason Gustafson, Gwen Shapira Closes #321 from ewencp/kafka-2371-distributed-herder Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735 Branch: refs/heads/trunk Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461 Parents: 21443f2 Author: Ewen Cheslack-Postava Authored: Fri Oct 23 16:37:30 2015 -0700 Committer: Gwen Shapira Committed: Fri Oct 23 16:37:30 2015 -0700 -- build.gradle| 1 + checkstyle/import-control.xml | 1 + .../clients/consumer/RoundRobinAssignor.java| 35 +- .../consumer/internals/AbstractCoordinator.java | 8 +- .../kafka/common/utils/CircularIterator.java| 54 ++ config/copycat-distributed.properties | 2 + .../kafka/copycat/file/FileStreamSinkTask.java | 12 +- .../copycat/file/FileStreamSourceTask.java | 17 +- .../kafka/copycat/cli/CopycatDistributed.java | 7 +- .../kafka/copycat/runtime/ConnectorConfig.java | 2 +- .../kafka/copycat/runtime/TaskConfig.java | 54 ++ .../apache/kafka/copycat/runtime/Worker.java| 145 +++- .../runtime/distributed/ClusterConfigState.java | 40 +- .../runtime/distributed/CopycatProtocol.java| 246 +++ .../runtime/distributed/DistributedHerder.java | 733 +-- .../distributed/DistributedHerderConfig.java| 192 + .../runtime/distributed/NotLeaderException.java | 38 + .../runtime/distributed/WorkerCoordinator.java | 288 .../runtime/distributed/WorkerGroupMember.java | 184 + .../distributed/WorkerRebalanceListener.java| 38 + .../runtime/standalone/StandaloneHerder.java| 168 ++--- .../copycat/storage/KafkaConfigStorage.java | 64 +- .../storage/KafkaOffsetBackingStore.java| 2 + .../kafka/copycat/util/ConnectorTaskId.java | 10 +- .../kafka/copycat/runtime/WorkerTest.java | 199 - .../distributed/DistributedHerderTest.java | 436 ++- .../distributed/WorkerCoordinatorTest.java | 436 +++ .../standalone/StandaloneHerderTest.java| 45 +- .../copycat/storage/KafkaConfigStorageTest.java | 49 +- .../apache/kafka/copycat/util/TestFuture.java | 10 +- tests/kafkatest/services/copycat.py | 67 +- .../kafkatest/tests/copycat_distributed_test.py | 67 +- tests/kafkatest/tests/copycat_test.py | 5 +- .../templates/copycat-distributed.properties| 7 +- 34 files changed, 2966 insertions(+), 696 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle -- diff --git a/build.gradle b/build.gradle index 16fb981..128c195 100644 --- a/build.gradle +++ b/build.gradle @@ -754,6 +754,7 @@ project(':copycat:runtime') { testCompile "$easymock" testCompile "$powermock" testCompile "$powermock_easymock" +testCompile project(':clients').sourceSets.test.output testRuntime "$slf4jlog4j" testRuntime project(":copycat:json") } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml -- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 6474865..e1ea93c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -146,6 +146,7 @@ + http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java index c5ea2bb..b8dc253 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java @@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer; import
[1/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.
Repository: kafka Updated Branches: refs/heads/trunk 21443f214 -> 2e6177359 http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java -- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java index 0463b85..1213656 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java @@ -17,20 +17,19 @@ package org.apache.kafka.copycat.runtime.distributed; -import org.apache.kafka.copycat.connector.Connector; -import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.HerderConnectorContext; +import org.apache.kafka.copycat.runtime.TaskConfig; import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.source.SourceConnector; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.storage.KafkaConfigStorage; import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.FutureCallback; +import org.apache.kafka.copycat.util.TestFuture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,251 +38,354 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; @RunWith(PowerMockRunner.class) -@PrepareForTest({DistributedHerder.class}) +@PrepareForTest(DistributedHerder.class) @PowerMockIgnore("javax.management.*") public class DistributedHerderTest { -private static final List CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3"); -private static final List SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2"); -private static final List SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3"); -private static final String TOPICS_LIST_STR = "topic1,topic2"; +private static final MapHERDER_CONFIG = new HashMap<>(); +static { +HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); +HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, "test-copycat-group"); +} -private static final Map CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); +private static final String CONN1 = "sourceA"; +private static final String CONN2 = "sourceA"; +private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0); +private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1); +private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2); +private static final Integer MAX_TASKS = 3; +private static final Map CONNECTOR_CONFIG = new HashMap<>(); +static { +CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA"); +CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); +CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); +CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); +} +private static final Map TASK_CONFIG = new HashMap<>(); +static { +TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); +} +private static final HashMap > TASK_CONFIGS = new HashMap<>(); +static { +TASK_CONFIGS.put(TASK0, TASK_CONFIG); +TASK_CONFIGS.put(TASK1, TASK_CONFIG); +TASK_CONFIGS.put(TASK2, TASK_CONFIG); +} +private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1,
[2/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java -- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java new file mode 100644 index 000..bd2ba56 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SaslConfigs; + +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + +public class DistributedHerderConfig extends AbstractConfig { +private static final ConfigDef CONFIG; + +/* + * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS + * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. + */ + +/** + * group.id + */ +public static final String GROUP_ID_CONFIG = "group.id"; +private static final String GROUP_ID_DOC = "A unique string that identifies the Copycat cluster group this worker belongs to."; + +/** + * session.timeout.ms + */ +public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; +private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities."; + +/** + * heartbeat.interval.ms + */ +public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; +private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + +/** + * worker.sync.timeout.ms + */ +public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms"; +private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs" + +" to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and" + +" waiting a backoff period before rejoining."; + +/** + * group.unsync.timeout.ms + */ +public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms"; +private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and " + +" fails to catch up within worker.sync.timeout.ms, leave the Copycat cluster for this long before rejoining."; +public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000; + +static { +CONFIG = new ConfigDef() +.define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, +ConfigDef.Type.LIST, +ConfigDef.Importance.HIGH, +CommonClientConfigs.BOOSTRAP_SERVERS_DOC) +.define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC) +.define(SESSION_TIMEOUT_MS_CONFIG, +ConfigDef.Type.INT, +3, +ConfigDef.Importance.HIGH, +SESSION_TIMEOUT_MS_DOC) +.define(HEARTBEAT_INTERVAL_MS_CONFIG, +ConfigDef.Type.INT, +
kafka git commit: KAFKA-2641; Upgrade path for ZK authentication
Repository: kafka Updated Branches: refs/heads/trunk 701c46b3a -> 21443f214 KAFKA-2641; Upgrade path for ZK authentication This pull request adds a configuration parameter and a migration tool. It is also based on pull request #303, which should go in first. Author: flavio junqueiraAuthor: Flavio Junqueira Author: Ismael Juma Reviewers: Ismael Juma , Jun Rao Closes #313 from fpj/KAFKA-2641 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21443f21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21443f21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21443f21 Branch: refs/heads/trunk Commit: 21443f214fc6f1f51037e27f8ece155cf1eb288c Parents: 701c46b Author: flavio junqueira Authored: Fri Oct 23 15:11:07 2015 -0700 Committer: Jun Rao Committed: Fri Oct 23 15:11:07 2015 -0700 -- bin/zookeeper-security-migration.sh | 17 ++ .../apache/kafka/common/security/JaasUtils.java | 5 + .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 237 +++ .../main/scala/kafka/server/KafkaConfig.scala | 5 + .../main/scala/kafka/server/KafkaServer.scala | 9 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 15 +- .../test/scala/other/kafka/DeleteZKPath.scala | 1 - .../unit/kafka/server/KafkaConfigTest.scala | 1 + 9 files changed, 285 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/bin/zookeeper-security-migration.sh -- diff --git a/bin/zookeeper-security-migration.sh b/bin/zookeeper-security-migration.sh new file mode 100755 index 000..65fce85 --- /dev/null +++ b/bin/zookeeper-security-migration.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java index b8c870d..dade986 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java @@ -109,6 +109,11 @@ public class JaasUtils { throw new KafkaException("Exception while determining if ZooKeeper is secure"); } } +/* + * Tests fail if we don't reset the login configuration. It is unclear + * what is actually triggering this bug. + */ +Configuration.setConfiguration(null); return isSecurityEnabled; } http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/AdminUtils.scala -- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 6fff176..64527de 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -226,7 +226,7 @@ object AdminUtils extends Logging { def topicExists(zkUtils: ZkUtils, topic: String): Boolean = zkUtils.zkClient.exists(getTopicPath(topic)) - + def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/21443f21/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala -- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala new