LiamClarkeNZ commented on code in PR #19850:
URL: https://github.com/apache/kafka/pull/19850#discussion_r2403746075
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -77,7 +86,7 @@ public void testStickyPartitioning() {
assertNotEquals(partA,
builtInPartitionerA.peekCurrentPartitionInfo(testCluster).partition());
// Check that switching works even when there is one partition.
- BuiltInPartitioner builtInPartitionerB = new
SequentialPartitioner(logContext, TOPIC_B, 1);
+ BuiltInPartitioner builtInPartitionerB = new
SequentialPartitioner(logContext, TOPIC_B, 1, false, "");
Review Comment:
```suggestion
BuiltInPartitioner builtInPartitionerB = new
SequentialPartitioner(logContext, TOPIC_B, 1, rackAware, clientRackId);
```
Just to make the meaning of `false` and `""` obvious at a glance :)
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -58,7 +67,7 @@ public void testStickyPartitioning() {
Collections.emptySet(), Collections.emptySet());
// Create partitions with "sticky" batch size to accommodate 3 records.
- BuiltInPartitioner builtInPartitionerA = new
SequentialPartitioner(logContext, TOPIC_A, 3);
+ BuiltInPartitioner builtInPartitionerA = new
SequentialPartitioner(logContext, TOPIC_A, 3, false, "");
Review Comment:
```suggestion
final boolean rackAware = false;
final String clientRackId = "";
BuiltInPartitioner builtInPartitionerA = new
SequentialPartitioner(logContext, TOPIC_A, 3, rackAware, clientRackId);
```
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -40,8 +41,10 @@ public class BuiltInPartitioner {
private final Logger log;
private final String topic;
private final int stickyBatchSize;
+ private final boolean rackAware;
+ private final String rack;
- private volatile PartitionLoadStats partitionLoadStats = null;
+ private volatile PartitionLoadStatsHolder partitionLoadStats = null;
Review Comment:
```suggestion
private volatile PartitionLoadStatsHolder partitionLoadStatsHolder =
null;
```
As it got a bit confusing when later the code is retrieving
`partitionLoadStats.inThisRack` which is a member of `PartitionLoadStatsHolder`
of type `PartitionLoadStats`. :D
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -144,22 +245,34 @@ public void unavailablePartitionsTest() {
assertEquals(0, partC);
}
- @Test
- public void adaptivePartitionsTest() {
- BuiltInPartitioner builtInPartitioner = new
SequentialPartitioner(logContext, TOPIC_A, 1);
+ @ParameterizedTest
+ // All these cases exclude rack-aware partitioning,
+ // but ensure various combinations of broker and client rack settings
don't cause problems.
+ @CsvSource({
+ "false,false,",
+ "true,false,",
+ "false,true,rack0",
+ })
+ public void adaptivePartitionsTest(boolean brokerRacksArePresent, boolean
clientRackAware, String clientRack) {
+ BuiltInPartitioner builtInPartitioner = new
SequentialPartitioner(logContext, TOPIC_A, 1, clientRackAware, clientRack);
// Simulate partition queue sizes.
int[] queueSizes = {5, 0, 3, 0, 1};
int[] partitionIds = new int[queueSizes.length];
+ String[] partitionRacks = new String[queueSizes.length];
int[] expectedFrequencies = new int[queueSizes.length];
List<PartitionInfo> allPartitions = new ArrayList<>();
for (int i = 0; i < partitionIds.length; i++) {
+ final Node leader = NODES[i % NODES.length];
partitionIds[i] = i;
- allPartitions.add(new PartitionInfo(TOPIC_A, i, NODES[i %
NODES.length], NODES, NODES));
+ if (brokerRacksArePresent) {
+ partitionRacks[i] = leader.rack();
+ }
+ allPartitions.add(new PartitionInfo(TOPIC_A, i, leader, NODES,
NODES));
expectedFrequencies[i] = 6 - queueSizes[i]; // 6 is
max(queueSizes) + 1
Review Comment:
Love the comment :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]