This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 217f45e  KAFKA-7864; validate partitions are 0-based (#6246)
217f45e is described below

commit 217f45ed554b34d5221e1dd3db76e4be892661cf
Author: Ryan Chen <ctch...@users.noreply.github.com>
AuthorDate: Fri Feb 22 09:11:21 2019 -0800

    KAFKA-7864; validate partitions are 0-based (#6246)
    
    Reviewers: Sriharsha Chintalapani <srihar...@apache.org>, Jun Rao 
<jun...@gmail.com>
---
 core/src/main/scala/kafka/zk/AdminZkClient.scala          |  6 ++++++
 .../kafka/admin/PreferredReplicaElectionCommandTest.scala |  4 ++--
 .../unit/kafka/controller/ControllerIntegrationTest.scala |  4 ++--
 core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala | 15 +++++++++++++++
 4 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 14e7d7e..b10a089 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -125,6 +125,12 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
         throw new InvalidReplicaAssignmentException("Duplicate replica 
assignment found: " + partitionReplicaAssignment)
     )
 
+    val partitionSize = partitionReplicaAssignment.size
+    val sequenceSum = partitionSize * (partitionSize - 1) / 2
+    if (partitionReplicaAssignment.size != 
partitionReplicaAssignment.toSet.size ||
+        partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum)
+        throw new InvalidReplicaAssignmentException("partitions should be a 
consecutive 0-based integer sequence")
+
     LogConfig.validate(config)
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
 
b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
index d03ec04..cf752b8 100644
--- 
a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
+++ 
b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
@@ -49,9 +49,9 @@ class PreferredReplicaElectionCommandTest extends 
ZooKeeperTestHarness with Logg
 
   @Test
   def testBasicPreferredReplicaElection() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
-    val partition = 1
+    val partition = 0
     val preferredReplica = 0
     // create brokers
     val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 08747a8..f167876 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -385,9 +385,9 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 
   @Test
   def testControlledShutdown() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
-    val partition = 1
+    val partition = 0
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
false).map(KafkaConfig.fromProps)
     servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index c120caa..fa8635f 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -66,6 +66,21 @@ class AdminZkClientTest extends ZooKeeperTestHarness with 
Logging with RackAware
       adminZkClient.createTopicWithAssignment("test", topicConfig, 
Map(0->Seq(0,1), 1->Seq(0)))
     }
 
+    // partitions should be 0-based
+    intercept[InvalidReplicaAssignmentException] {
+      adminZkClient.createTopicWithAssignment("test", topicConfig, 
Map(1->Seq(1,2), 2->Seq(1,2)))
+    }
+
+    // partitions should be 0-based and consecutive
+    intercept[InvalidReplicaAssignmentException] {
+      adminZkClient.createTopicWithAssignment("test", topicConfig, 
Map(0->Seq(1,2), 0->Seq(1,2), 3->Seq(1,2)))
+    }
+
+    // partitions should be 0-based and consecutive
+    intercept[InvalidReplicaAssignmentException] {
+      adminZkClient.createTopicWithAssignment("test", topicConfig, 
Map(-1->Seq(1,2), 1->Seq(1,2), 2->Seq(1,2), 4->Seq(1,2)))
+    }
+
     // good assignment
     val assignment = Map(0 -> List(0, 1, 2),
                          1 -> List(1, 2, 3))

Reply via email to