dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1565280518
########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java: ########## @@ -0,0 +1,198 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000", "10000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<>(); + + private final int numBrokerRacks = 3; + + private final int replicationFactor = 2; Review Comment: Could they be static? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ########## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.server.common.TopicIdPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class AssignPartitionsMicroBenchmark { Review Comment: I wonder if we really need to commit this one. It seems to me that it was more a one off vs something that we will re-use in the future to ensure the performance of the assignors. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; Review Comment: I think that we need to agree on the cases that we want to use here. For the member, it would be great to have something like: 100, 500, 1000, 5000, 10000. For the topics, I wonder if we could have something based on the number of members. For instance, we could say, X partitions per member where X is also a Param. Would it be possible? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + if (isReassignment) { + GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + Map<String, MemberAssignment> members; + + members = initialAssignment.members(); + + // Update the AssignmentSpec with the results from the initial assignment. + Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>(); + + members.forEach((memberId, memberAssignment) -> { + AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); + updatedMembers.put(memberId, new AssignmentMemberSpec( + memberSpec.instanceId(), + memberSpec.rackId(), + memberSpec.subscribedTopicIds(), + memberAssignment.targetPartitions() + )); + }); + + // Add new member to trigger a reassignment. + Optional<String> rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); Review Comment: This code is duplicated. Should we have an help method for it? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + if (isReassignment) { + GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + Map<String, MemberAssignment> members; + + members = initialAssignment.members(); Review Comment: I suppose that we could remove `members` and inline `initialAssignment.members()` where we use it. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java: ########## @@ -0,0 +1,259 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + + @Param({"1000", "10000"}) + private int memberCount; + + @Param({"10", "50"}) + private int partitionsPerTopicCount; + + @Param({"1000"}) + private int topicCount; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isRackAware; Review Comment: Do we really need those? My understanding of this benchmark is that we want to measure the overhead of the TargetAssignmentBuilder therefore we don't have to measure the assignor itself. We have the other benchmark for this. We should simplify it as much as possible. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java: ########## @@ -0,0 +1,198 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000", "10000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<>(); + + private final int numBrokerRacks = 3; + + private final int replicationFactor = 2; + + protected AbstractPartitionAssignor assignor; Review Comment: Should we rather use `ConsumerPartitionAssignor`? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); Review Comment: The name of the method is a bit unclear. My understanding is that it creates the assignment spec. Should we call it createAssignmentSpec? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; Review Comment: Could this be static? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ########## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; Review Comment: We need to add the license. I wonder if we should call the package `assignor`. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); Review Comment: The style is inconsistent here. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } Review Comment: Should we extract this into an helper method? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); Review Comment: nit: We usually don't use `this` except in constructors or if there is a real need for it. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + if (isReassignment) { Review Comment: I wonder if we should rather call this "incremental assignment". What do you think? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; Review Comment: I wonder if there is a better way to express this. For instance, `isRangeAssignor` works today because we have only two assignors, `isSubscriptionUniform` mainly makes sense for the uniform assignor. I wonder if we could reuse some ideas (e.g. the enum) from `AuthorizerBenchmark`. Have you considered something like this? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java: ########## @@ -0,0 +1,259 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + + @Param({"1000", "10000"}) + private int memberCount; + + @Param({"10", "50"}) + private int partitionsPerTopicCount; + + @Param({"1000"}) + private int topicCount; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isRackAware; + + /** + * The group Id. + */ + String groupId = "benchmark-group"; + + /** + * The group epoch. + */ + private final int groupEpoch = 0; + + /** + * The partition partitionAssignor used to compute the assignment. + */ + private PartitionAssignor partitionAssignor; + + /** + * The members in the group. + */ + private Map<String, ConsumerGroupMember> members = Collections.emptyMap(); + + /** + * The subscription metadata. + */ + private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap(); + + /** + * The existing target assignment. + */ + private Map<String, Assignment> existingTargetAssignment = Collections.emptyMap(); + + private TargetAssignmentBuilder targetAssignmentBuilder; + + private AssignmentSpec assignmentSpec; + + int numberOfRacks = 3; Review Comment: static? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + if (isReassignment) { + GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + Map<String, MemberAssignment> members; + + members = initialAssignment.members(); + + // Update the AssignmentSpec with the results from the initial assignment. + Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>(); + + members.forEach((memberId, memberAssignment) -> { + AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); + updatedMembers.put(memberId, new AssignmentMemberSpec( + memberSpec.instanceId(), + memberSpec.rackId(), + memberSpec.subscribedTopicIds(), + memberAssignment.targetPartitions() + )); + }); + + // Add new member to trigger a reassignment. + Optional<String> rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + + updatedMembers.put("newMember", new AssignmentMemberSpec( + Optional.empty(), + rackId, + topicMetadata.keySet(), + Collections.emptyMap() + )); + + this.assignmentSpec = new AssignmentSpec(updatedMembers); + } + } + + private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { Review Comment: It looks like this method could be static. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java: ########## @@ -0,0 +1,259 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + + @Param({"1000", "10000"}) + private int memberCount; + + @Param({"10", "50"}) + private int partitionsPerTopicCount; + + @Param({"1000"}) + private int topicCount; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isRackAware; + + /** + * The group Id. + */ + String groupId = "benchmark-group"; + + /** + * The group epoch. + */ + private final int groupEpoch = 0; + + /** + * The partition partitionAssignor used to compute the assignment. + */ + private PartitionAssignor partitionAssignor; + + /** + * The members in the group. + */ + private Map<String, ConsumerGroupMember> members = Collections.emptyMap(); + + /** + * The subscription metadata. + */ + private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap(); + + /** + * The existing target assignment. + */ + private Map<String, Assignment> existingTargetAssignment = Collections.emptyMap(); + + private TargetAssignmentBuilder targetAssignmentBuilder; + + private AssignmentSpec assignmentSpec; + + int numberOfRacks = 3; + + List<String> allTopicNames = new ArrayList<>(topicCount); + List<Uuid> allTopicIds = new ArrayList<>(topicCount); + + @Setup(Level.Trial) + public void setup() { + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + this.subscriptionMetadata = generateMockSubscriptionMetadata(); + + this.members = generateMockMembers(); + + this.existingTargetAssignment = generateMockInitialTargetAssignment(); + + // Add a new member to trigger a rebalance. + Set<String> subscribedTopics = new HashSet<>(subscriptionMetadata.keySet()); + String rackId = isRackAware ? "rack" + (memberCount + 1) % numberOfRacks : ""; + ConsumerGroupMember new_member = new ConsumerGroupMember.Builder("new-member") Review Comment: nit: The variable does not respect our naming conventions. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ########## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + @Param({"10", "50", "100"}) + private int partitionsPerTopicCount; + + @Param({"100"}) + private int topicCount; + + @Param({"500", "1000"}) + private int memberCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"true", "false"}) + private boolean isSubscriptionUniform; + + @Param({"true", "false"}) + private boolean isRangeAssignor; + + @Param({"true", "false"}) + private boolean isReassignment; + + private PartitionAssignor partitionAssignor; + + private final int numberOfRacks = 3; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + @Setup(Level.Trial) + public void setup() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + Map<Integer, Set<String>> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 1; i <= topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); + } + + addTopicSubscriptions(topicMetadata); + this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + if (isRangeAssignor) { + this.partitionAssignor = new RangeAssignor(); + } else { + this.partitionAssignor = new UniformAssignor(); + } + + if (isReassignment) { + GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + Map<String, MemberAssignment> members; + + members = initialAssignment.members(); + + // Update the AssignmentSpec with the results from the initial assignment. + Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>(); + + members.forEach((memberId, memberAssignment) -> { + AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); + updatedMembers.put(memberId, new AssignmentMemberSpec( + memberSpec.instanceId(), + memberSpec.rackId(), + memberSpec.subscribedTopicIds(), + memberAssignment.targetPartitions() + )); + }); + + // Add new member to trigger a reassignment. + Optional<String> rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + + updatedMembers.put("newMember", new AssignmentMemberSpec( + Optional.empty(), + rackId, + topicMetadata.keySet(), + Collections.emptyMap() + )); + + this.assignmentSpec = new AssignmentSpec(updatedMembers); + } + } + + private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { + Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % numberOfRacks, "rack" + (i + 1) % numberOfRacks))); + } + return partitionRacks; + } + + private void addTopicSubscriptions(Map<Uuid, TopicMetadata> topicMetadata) { + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + List<Uuid> allTopicIds = new ArrayList<>(topicMetadata.keySet()); + int topicCounter = 0; + + for (int i = 0; i < memberCount; i++) { + String memberName = "member" + i; + Optional<String> rackId = isRackAware ? Optional.of("rack" + i % numberOfRacks) : Optional.empty(); + List<Uuid> subscribedTopicIds; + + // When subscriptions are uniform, all members are assigned all topics. + if (isSubscriptionUniform) { + subscribedTopicIds = allTopicIds; + } else { + subscribedTopicIds = Arrays.asList( + allTopicIds.get(i % topicCount), + allTopicIds.get((i+1) % topicCount) + ); + topicCounter = max (topicCounter, ((i+1) % topicCount)); + + if (i == memberCount - 1 && topicCounter < topicCount - 1) { + subscribedTopicIds.addAll(allTopicIds.subList(topicCounter + 1, topicCount - 1)); + } Review Comment: I don't fully understand you selection logic. Could you elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org