This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 217f45e KAFKA-7864; validate partitions are 0-based (#6246) 217f45e is described below commit 217f45ed554b34d5221e1dd3db76e4be892661cf Author: Ryan Chen <ctch...@users.noreply.github.com> AuthorDate: Fri Feb 22 09:11:21 2019 -0800 KAFKA-7864; validate partitions are 0-based (#6246) Reviewers: Sriharsha Chintalapani <srihar...@apache.org>, Jun Rao <jun...@gmail.com> --- core/src/main/scala/kafka/zk/AdminZkClient.scala | 6 ++++++ .../kafka/admin/PreferredReplicaElectionCommandTest.scala | 4 ++-- .../unit/kafka/controller/ControllerIntegrationTest.scala | 4 ++-- core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala | 15 +++++++++++++++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 14e7d7e..b10a089 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -125,6 +125,12 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment) ) + val partitionSize = partitionReplicaAssignment.size + val sequenceSum = partitionSize * (partitionSize - 1) / 2 + if (partitionReplicaAssignment.size != partitionReplicaAssignment.toSet.size || + partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum) + throw new InvalidReplicaAssignmentException("partitions should be a consecutive 0-based integer sequence") + LogConfig.validate(config) } diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala index d03ec04..cf752b8 100644 --- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala @@ -49,9 +49,9 @@ class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logg @Test def testBasicPreferredReplicaElection() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val partition = 1 + val partition = 0 val preferredReplica = 0 // create brokers val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2") diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 08747a8..f167876 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -385,9 +385,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { @Test def testControlledShutdown() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val partition = 1 + val partition = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index c120caa..fa8635f 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -66,6 +66,21 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware adminZkClient.createTopicWithAssignment("test", topicConfig, Map(0->Seq(0,1), 1->Seq(0))) } + // partitions should be 0-based + intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(1->Seq(1,2), 2->Seq(1,2))) + } + + // partitions should be 0-based and consecutive + intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(0->Seq(1,2), 0->Seq(1,2), 3->Seq(1,2))) + } + + // partitions should be 0-based and consecutive + intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(-1->Seq(1,2), 1->Seq(1,2), 2->Seq(1,2), 4->Seq(1,2))) + } + // good assignment val assignment = Map(0 -> List(0, 1, 2), 1 -> List(1, 2, 3))