Repository: kafka Updated Branches: refs/heads/trunk 846362724 -> 1d2bd6284
KAFKA-2337; Verify that metric names will not collide when creating new topics; patched by Grant Henke; reviewed by Edward Ribeiro, Ashish Singh and Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d2bd628 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d2bd628 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d2bd628 Branch: refs/heads/trunk Commit: 1d2bd6284b06b579c901e6be8919a8a27dbe11ee Parents: 8463627 Author: Grant Henke <granthe...@gmail.com> Authored: Mon Jul 20 16:15:42 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Mon Jul 20 16:15:42 2015 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AdminUtils.scala | 17 ++++++-- .../main/scala/kafka/admin/TopicCommand.scala | 4 +- core/src/main/scala/kafka/common/Topic.scala | 22 +++++++++++ .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +++++++- .../scala/unit/kafka/common/TopicTest.scala | 41 ++++++++++++++++++++ 5 files changed, 95 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 2b4e028..4cc2376 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -250,8 +250,19 @@ object AdminUtils extends Logging { require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") val topicPath = ZkUtils.getTopicPath(topic) - if(!update && zkClient.exists(topicPath)) - throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) + + if (!update) { + if (zkClient.exists(topicPath)) + throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) + else if (Topic.hasCollisionChars(topic)) { + val allTopics = ZkUtils.getAllTopics(zkClient) + val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t)) + if (collidingTopics.nonEmpty) { + throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", "))) + } + } + } + partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) // write out the config if there is any, this isn't transactional with the partition assignments @@ -260,7 +271,7 @@ object AdminUtils extends Logging { // create the partition assignment writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) } - + private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = ZkUtils.getTopicPath(topic) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index a90aa87..4e28bf1 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -85,9 +85,11 @@ object TopicCommand extends Logging { def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) + if (Topic.hasCollisionChars(topic)) + println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.") if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, update = false) } else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index 32595d6..db75d4b 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -43,4 +43,26 @@ object Topic { case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") } } + + /** + * Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. + * + * @param topic The topic to check for colliding character + * @return true if the topic has collision characters + */ + def hasCollisionChars(topic: String): Boolean = { + topic.contains("_") || topic.contains(".") + } + + /** + * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position. + * + * @param topicA A topic to check for collision + * @param topicB A topic to check for collision + * @return true if the topics collide + */ + def hasCollision(topicA: String, topicB: String): Boolean = { + topicA.replace('.', '_') == topicB.replace('.', '_') + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 252ac81..93f200e 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -24,7 +24,7 @@ import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, TopicAndPartition} +import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition} import kafka.server.{KafkaServer, KafkaConfig} import java.io.File import TestUtils._ @@ -134,6 +134,20 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } + @Test + def testTopicCreationWithCollision() { + val topic = "test.topic" + val collidingTopic = "test_topic" + TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + // create the topic + AdminUtils.createTopic(zkClient, topic, 3, 1) + + intercept[InvalidTopicException] { + // shouldn't be able to create a topic that collides + AdminUtils.createTopic(zkClient, collidingTopic, 3, 1) + } + } + private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = { servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists) .map(_.config.brokerId) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/test/scala/unit/kafka/common/TopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala index 79532c8..17525fe 100644 --- a/core/src/test/scala/unit/kafka/common/TopicTest.scala +++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala @@ -57,4 +57,45 @@ class TopicTest { } } } + + @Test + def testTopicHasCollisionChars() = { + val falseTopics = List("start", "end", "middle", "many") + val trueTopics = List( + ".start", "end.", "mid.dle", ".ma.ny.", + "_start", "end_", "mid_dle", "_ma_ny." + ) + + falseTopics.foreach( t => + assertFalse(Topic.hasCollisionChars(t)) + ) + + trueTopics.foreach( t => + assertTrue(Topic.hasCollisionChars(t)) + ) + } + + @Test + def testTopicHasCollision() = { + val periodFirstMiddleLastNone = List(".topic", "to.pic", "topic.", "topic") + val underscoreFirstMiddleLastNone = List("_topic", "to_pic", "topic_", "topic") + + // Self + periodFirstMiddleLastNone.foreach { t => + assertTrue(Topic.hasCollision(t, t)) + } + underscoreFirstMiddleLastNone.foreach { t => + assertTrue(Topic.hasCollision(t, t)) + } + + // Same Position + periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone).foreach { case (t1, t2) => + assertTrue(Topic.hasCollision(t1, t2)) + } + + // Different Position + periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone.reverse).foreach { case (t1, t2) => + assertFalse(Topic.hasCollision(t1, t2)) + } + } }