josefk31 commented on code in PR #17604:
URL: https://github.com/apache/kafka/pull/17604#discussion_r1819789470
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1136,6 +1139,34 @@ ControllerResult<AlterPartitionResponseData>
alterPartition(
return ControllerResult.of(records, response);
}
+ /**
+ * Validates that a batch of topics will create less than {@value
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
+ * has led to out-of-memory exceptions. We use this validation to fail
earlier to avoid allocating the memory.
+ * This validation assumes that all the topics in the batch are valid and
can be created.
+ *
+ * @param request a batch of topics to create.
+ * @param defaultNumPartitions default number of partitions to assign if
unspecified.
+ * @throws PolicyViolationException if total number of partitions exceeds
{@value MAX_PARTITIONS_PER_BATCH}.
+ */
+ static void
validateEstimatedTotalNumberOfPartitions(CreateTopicsRequestData request, int
defaultNumPartitions) {
+ int totalPartitions = 0;
+ for (CreatableTopic topic: request.topics()) {
+ if (topic.assignments().isEmpty()) {
+ if (topic.numPartitions() == -1) {
+ totalPartitions += defaultNumPartitions;
+ } else if (topic.numPartitions() > 0) {
+ totalPartitions += topic.numPartitions();
+ }
+ } else {
+ totalPartitions += topic.assignments().size();
Review Comment:
Reread the code so have a better understanding now. Outlining my reasoning
here:
There are two ways to create topics:
1. Manually assign partitions and replicas for topics (`assignments`)
2. Specify a number of partitions and have Kafka automatically derive
replications.
We don't actually know how many topics (and partitions) will be created
until running various validations in each `createTopic` call. The most recent
code calculates the maximum number of partitions which could be created and
throws exception if the request goes above the limit. I think this is fine
because the user intent is to create a batch with too many partitions. We add
an edge case if we exclude misconfigured or invalid partitions from the limit...
IUUC, each `assignment` can be treated as creating `1` partition so we can
use the size of this collection as counting towards the limit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]