Repository: kafka
Updated Branches:
  refs/heads/trunk 846362724 -> 1d2bd6284


KAFKA-2337;  Verify that metric names will not collide when creating new 
topics; patched by Grant Henke; reviewed by Edward Ribeiro, Ashish Singh and 
Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d2bd628
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d2bd628
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d2bd628

Branch: refs/heads/trunk
Commit: 1d2bd6284b06b579c901e6be8919a8a27dbe11ee
Parents: 8463627
Author: Grant Henke <granthe...@gmail.com>
Authored: Mon Jul 20 16:15:42 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Mon Jul 20 16:15:42 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala | 17 ++++++--
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 +-
 core/src/main/scala/kafka/common/Topic.scala    | 22 +++++++++++
 .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +++++++-
 .../scala/unit/kafka/common/TopicTest.scala     | 41 ++++++++++++++++++++
 5 files changed, 95 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 2b4e028..4cc2376 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -250,8 +250,19 @@ object AdminUtils extends Logging {
     require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, 
"All partitions should have the same number of replicas.")
 
     val topicPath = ZkUtils.getTopicPath(topic)
-    if(!update && zkClient.exists(topicPath))
-      throw new TopicExistsException("Topic \"%s\" already 
exists.".format(topic))
+
+    if (!update) {
+      if (zkClient.exists(topicPath))
+        throw new TopicExistsException("Topic \"%s\" already 
exists.".format(topic))
+      else if (Topic.hasCollisionChars(topic)) {
+        val allTopics = ZkUtils.getAllTopics(zkClient)
+        val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, 
t))
+        if (collidingTopics.nonEmpty) {
+          throw new InvalidTopicException("Topic \"%s\" collides with existing 
topics: %s".format(topic, collidingTopics.mkString(", ")))
+        }
+      }
+    }
+
     partitionReplicaAssignment.values.foreach(reps => require(reps.size == 
reps.toSet.size, "Duplicate replica assignment found: "  + 
partitionReplicaAssignment))
     
     // write out the config if there is any, this isn't transactional with the 
partition assignments
@@ -260,7 +271,7 @@ object AdminUtils extends Logging {
     // create the partition assignment
     writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, 
update)
   }
-  
+
   private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, 
replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a90aa87..4e28bf1 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -85,9 +85,11 @@ object TopicCommand extends Logging {
   def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
+    if (Topic.hasCollisionChars(topic))
+      println("WARNING: Due to limitations in metric names, topics with a 
period ('.') or underscore ('_') could collide. To avoid issues it is best to 
use either, but not both.")
     if (opts.options.has(opts.replicaAssignmentOpt)) {
       val assignment = 
parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, 
topic, assignment, configs)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, 
topic, assignment, configs, update = false)
     } else {
       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, 
opts.partitionsOpt, opts.replicationFactorOpt)
       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala 
b/core/src/main/scala/kafka/common/Topic.scala
index 32595d6..db75d4b 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -43,4 +43,26 @@ object Topic {
       case None => throw new InvalidTopicException("topic name " + topic + " 
is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and 
'-'")
     }
   }
+
+  /**
+   * Due to limitations in metric names, topics with a period ('.') or 
underscore ('_') could collide.
+   *
+   * @param topic The topic to check for colliding character
+   * @return true if the topic has collision characters
+   */
+  def hasCollisionChars(topic: String): Boolean = {
+    topic.contains("_") || topic.contains(".")
+  }
+
+  /**
+   * Returns true if the topicNames collide due to a period ('.') or 
underscore ('_') in the same position.
+   *
+   * @param topicA A topic to check for collision
+   * @param topicB A topic to check for collision
+   * @return true if the topics collide
+   */
+  def hasCollision(topicA: String, topicB: String): Boolean = {
+    topicA.replace('.', '_') == topicB.replace('.', '_')
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 252ac81..93f200e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -24,7 +24,7 @@ import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{TopicExistsException, TopicAndPartition}
+import kafka.common.{InvalidTopicException, TopicExistsException, 
TopicAndPartition}
 import kafka.server.{KafkaServer, KafkaConfig}
 import java.io.File
 import TestUtils._
@@ -134,6 +134,20 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
     }
   }
 
+  @Test
+  def testTopicCreationWithCollision() {
+    val topic = "test.topic"
+    val collidingTopic = "test_topic"
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    // create the topic
+    AdminUtils.createTopic(zkClient, topic, 3, 1)
+
+    intercept[InvalidTopicException] {
+      // shouldn't be able to create a topic that collides
+      AdminUtils.createTopic(zkClient, collidingTopic, 3, 1)
+    }
+  }
+
   private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], 
topic: String, partitionId: Int): Set[Int] = {
     servers.filter(server => new File(server.config.logDirs.head, topic + "-" 
+ partitionId).exists)
            .map(_.config.brokerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2bd628/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala 
b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 79532c8..17525fe 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -57,4 +57,45 @@ class TopicTest {
       }
     }
   }
+
+  @Test
+  def testTopicHasCollisionChars() = {
+    val falseTopics = List("start", "end", "middle", "many")
+    val trueTopics = List(
+      ".start", "end.", "mid.dle", ".ma.ny.",
+      "_start", "end_", "mid_dle", "_ma_ny."
+    )
+
+    falseTopics.foreach( t =>
+      assertFalse(Topic.hasCollisionChars(t))
+    )
+
+    trueTopics.foreach( t =>
+      assertTrue(Topic.hasCollisionChars(t))
+    )
+  }
+
+  @Test
+  def testTopicHasCollision() = {
+    val periodFirstMiddleLastNone = List(".topic", "to.pic", "topic.", "topic")
+    val underscoreFirstMiddleLastNone = List("_topic", "to_pic", "topic_", 
"topic")
+
+    // Self
+    periodFirstMiddleLastNone.foreach { t =>
+      assertTrue(Topic.hasCollision(t, t))
+    }
+    underscoreFirstMiddleLastNone.foreach { t =>
+      assertTrue(Topic.hasCollision(t, t))
+    }
+
+    // Same Position
+    periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone).foreach { 
case (t1, t2) =>
+      assertTrue(Topic.hasCollision(t1, t2))
+    }
+
+    // Different Position
+    
periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone.reverse).foreach { 
case (t1, t2) =>
+      assertFalse(Topic.hasCollision(t1, t2))
+    }
+  }
 }

Reply via email to