cmccabe commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r609096892



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1007,6 +1014,112 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
         return ControllerResult.of(records, null);
     }
 
+    ControllerResult<List<CreatePartitionsTopicResult>>
+            createPartitions(List<CreatePartitionsTopic> topics) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        List<CreatePartitionsTopicResult> results = new ArrayList<>();
+        for (CreatePartitionsTopic topic : topics) {
+            ApiError apiError = ApiError.NONE;
+            try {
+                createPartitions(topic, records);
+            } catch (ApiException e) {
+                apiError = ApiError.fromThrowable(e);
+            } catch (Exception e) {
+                log.error("Unexpected createPartitions error for {}", topic, 
e);
+                apiError = ApiError.fromThrowable(e);
+            }
+            results.add(new CreatePartitionsTopicResult().
+                setName(topic.name()).
+                setErrorCode(apiError.error().code()).
+                setErrorMessage(apiError.message()));
+        }
+        return new ControllerResult<>(records, results, true);
+    }
+
+    void createPartitions(CreatePartitionsTopic topic,
+                          List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topic.name());
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        if (topic.count() == topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("Topic already has " +
+                topicInfo.parts.size() + " partition(s).");
+        } else if (topic.count() < topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("The topic " + topic.name() + 
" currently " +
+                "has " + topicInfo.parts.size() + " partition(s); " + 
topic.count() +
+                " would not be an increase.");
+        }
+        int additional = topic.count() - topicInfo.parts.size();
+        if (topic.assignments() != null) {
+            if (topic.assignments().size() != additional) {
+                throw new InvalidReplicaAssignmentException("Attempted to add 
" + additional +
+                    " additional partition(s), but only " + 
topic.assignments().size() +
+                    " assignment(s) were specified.");
+            }
+        }
+        Iterator<PartitionControlInfo> iterator = 
topicInfo.parts.values().iterator();
+        if (!iterator.hasNext()) {
+            throw new UnknownServerException("Invalid state: topic " + 
topic.name() +
+                " appears to have no partitions.");
+        }
+        PartitionControlInfo partitionInfo = iterator.next();
+        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
+            throw new UnknownServerException("Invalid replication factor " +
+                partitionInfo.replicas.length + ": expected a number less than 
65536.");
+        }
+        short replicationFactor = (short) partitionInfo.replicas.length;
+        int startPartitionId = topicInfo.parts.size();
+
+        List<List<Integer>> placements = null;
+        if (topic.assignments() != null) {
+            placements = new ArrayList<>();
+            for (CreatePartitionsAssignment assignment : topic.assignments()) {

Review comment:
       Yes, let's factor out the validation logic into a separate function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to