squah-confluent commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2580158133
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -772,13 +780,18 @@ private void freeCurrentBatch() {
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
- // Release the buffer only if it is not larger than the
maxBatchSize.
- int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+ // Release the buffer only if it is not larger than the max buffer
size.
+ int maxBufferSize = appendMaxBufferSizeSupplier.get();
- if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+ if (currentBatch.builder.buffer().capacity() <= maxBufferSize) {
bufferSupplier.release(currentBatch.builder.buffer());
- } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+ cachedBufferSize.set(currentBatch.builder.buffer().capacity());
+ } else if (currentBatch.buffer.capacity() <= maxBufferSize) {
bufferSupplier.release(currentBatch.buffer);
+ cachedBufferSize.set(currentBatch.buffer.capacity());
Review Comment:
Should we also bump the discard counter when the builder expands the buffer
above the maxBufferSize and we return the unexpanded buffer to the supplier?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -413,6 +426,8 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;
+ private final AbstractConfig config;
Review Comment:
nit: Could we move this to the top of the class? Same with the assignment in
the constructor.
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3127,16 +3178,9 @@ public void testAppendRecordBatchSize() {
}
@Test
- public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+ public void testCoordinatorDoNotRetainBufferLargeThanMaxBufferSize() {
Review Comment:
Could we assert that `recordAppendBufferDiscarded` is called on a mock
`CoordinatorRuntimeMetrics`? The same for
`testBufferShrinkWhenMaxBufferSizeReducedBelowBatchSize`.
##########
docs/upgrade.html:
##########
@@ -19,16 +19,26 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>
+
+<h5><a id="upgrade_4_3_0_from" href="#upgrade_4_3_0_from">Upgrading Servers to
4.3.0 from any version 3.3.x through 4.2.0</a></h5>
+
+<h5><a id="upgrade_430_notable" href="#upgrade_430_notable">Notable changes in
4.3.0</a></h5>
+<ul>
+ <li>
+ Two new configs have been introduced:
+ <code>group.coordinator.append.max.buffer.size</code> in
<code>GroupCoordinatorConfig</code> and
<code>share.coordinator.append.max.buffer.size</code> in
<code>ShareCoordinatorConfig</code>.
Review Comment:
Cluster operators won't be reading the code. Maybe we can remove the
reference to `GroupCoordinatorConfig` and `ShareCoordinatorConfig`?
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -120,6 +120,7 @@ public static class Builder<S extends CoordinatorShard<U>,
U> {
private Compression compression;
private OptionalInt appendLingerMs;
private ExecutorService executorService;
+ private Supplier<Integer> appendMaxBufferSizeSupplier;
Review Comment:
nit: Could we move this up just after `appendLingerMs`? It'd be nice to
group the `append...` options together. The same for
`withMaxBufferSizeSupplier`.
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2109,7 +2126,8 @@ private CoordinatorRuntime(
Serializer<U> serializer,
Compression compression,
OptionalInt appendLingerMs,
- ExecutorService executorService
+ ExecutorService executorService,
+ Supplier<Integer> appendMaxBufferSizeSupplier
Review Comment:
nit: Could we move this parameter up, just after `appendLingerMs`? So the
`append...` parameters are grouped together. The same for the assignment in the
constructor.
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -614,6 +616,11 @@ class CoordinatorContext {
*/
int batchEpoch;
+ /**
+ * The cached buffer size.
+ */
+ AtomicLong cachedBufferSize;
+
Review Comment:
nit: Could we move this just after `bufferSupplier`? Since they're related.
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2851,7 +2899,6 @@ public void
testHighWatermarkUpdateWithDeferredEventExceptions() throws Executio
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntimeMetrics metrics =
mock(CoordinatorRuntimeMetrics.class);
-
Review Comment:
nit: stray change
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -196,6 +197,11 @@ public Builder<S, U> withExecutorService(ExecutorService
executorService) {
return this;
}
+ public Builder<S, U> withMaxBufferSizeSupplier(Supplier<Integer>
maxBufferSizeSupplier) {
+ this.appendMaxBufferSizeSupplier = maxBufferSizeSupplier;
Review Comment:
nit: Could we name this `withAppendMaxBufferSizeSupplier`? for consistency
```suggestion
public Builder<S, U>
withAppendMaxBufferSizeSupplier(Supplier<Integer> appendMaxBufferSizeSupplier) {
this.appendMaxBufferSizeSupplier = appendMaxBufferSizeSupplier;
```
##########
docs/upgrade.html:
##########
@@ -19,16 +19,26 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>
Review Comment:
Could we also add documentation for the new metrics to ops.html?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]