chia7712 commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1634581946
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -45,7 +45,7 @@ public class BuiltInPartitioner {
private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo =
new AtomicReference<>();
// Visible and used for testing only.
- static volatile public Supplier<Integer> mockRandom = null;
+ protected static volatile Supplier<Integer> mockRandom = null;
Review Comment:
Agree that could introduce some changes, but we can minimize the changes by
opening only required methods. For example:
**BuiltInPartitioner**
```java
int random() {
return Utils.toPositive(ThreadLocalRandom.current().nextInt());
}
private int nextPartition(Cluster cluster) {
int random = random();
...
}
```
**RecordAccumulator**
```java
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k ->
new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new
ArrayDeque<>());
}
BuiltInPartitioner createBuiltInPartitioner(LogContext logContext,
String topic, int stickyBatchSize) {
return new BuiltInPartitioner(logContext, topic, stickyBatchSize);
}
public TopicInfo(BuiltInPartitioner builtInPartitioner) {
this.builtInPartitioner = builtInPartitioner;
}
```
With those override-able methods, both `RecordAccumulatorTest` and
`BuiltInPartitionerTest` can create `BuiltInPartitioner` with custom `random`
method. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]