squah-confluent commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2580158133


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -772,13 +780,18 @@ private void freeCurrentBatch() {
             // Cancel the linger timeout.
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
-            // Release the buffer only if it is not larger than the 
maxBatchSize.
-            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+            // Release the buffer only if it is not larger than the max buffer 
size.
+            int maxBufferSize = appendMaxBufferSizeSupplier.get();
 
-            if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+            if (currentBatch.builder.buffer().capacity() <= maxBufferSize) {
                 bufferSupplier.release(currentBatch.builder.buffer());
-            } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+                cachedBufferSize.set(currentBatch.builder.buffer().capacity());
+            } else if (currentBatch.buffer.capacity() <= maxBufferSize) {
                 bufferSupplier.release(currentBatch.buffer);
+                cachedBufferSize.set(currentBatch.buffer.capacity());

Review Comment:
   Should we also bump the discard counter when the builder expands the buffer 
above the maxBufferSize and we return the unexpanded buffer to the supplier?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -413,6 +426,8 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupMaxStandbyReplicas;
     private final int streamsGroupInitialRebalanceDelayMs;
 
+    private final AbstractConfig config;

Review Comment:
   nit: Could we move this to the top of the class? Same with the assignment in 
the constructor.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3127,16 +3178,9 @@ public void testAppendRecordBatchSize() {
     }
 
     @Test
-    public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+    public void testCoordinatorDoNotRetainBufferLargeThanMaxBufferSize() {

Review Comment:
   Could we assert that `recordAppendBufferDiscarded` is called on a mock 
`CoordinatorRuntimeMetrics`? The same for 
`testBufferShrinkWhenMaxBufferSizeReducedBelowBatchSize`.



##########
docs/upgrade.html:
##########
@@ -19,16 +19,26 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>
+
+<h5><a id="upgrade_4_3_0_from" href="#upgrade_4_3_0_from">Upgrading Servers to 
4.3.0 from any version 3.3.x through 4.2.0</a></h5>
+
+<h5><a id="upgrade_430_notable" href="#upgrade_430_notable">Notable changes in 
4.3.0</a></h5>
+<ul>
+    <li>
+        Two new configs have been introduced:
+        <code>group.coordinator.append.max.buffer.size</code> in 
<code>GroupCoordinatorConfig</code> and 
<code>share.coordinator.append.max.buffer.size</code> in 
<code>ShareCoordinatorConfig</code>.

Review Comment:
   Cluster operators won't be reading the code. Maybe we can remove the 
reference to `GroupCoordinatorConfig` and `ShareCoordinatorConfig`?



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -120,6 +120,7 @@ public static class Builder<S extends CoordinatorShard<U>, 
U> {
         private Compression compression;
         private OptionalInt appendLingerMs;
         private ExecutorService executorService;
+        private Supplier<Integer> appendMaxBufferSizeSupplier;

Review Comment:
   nit: Could we move this up just after `appendLingerMs`? It'd be nice to 
group the `append...` options together. The same for 
`withMaxBufferSizeSupplier`.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2109,7 +2126,8 @@ private CoordinatorRuntime(
         Serializer<U> serializer,
         Compression compression,
         OptionalInt appendLingerMs,
-        ExecutorService executorService
+        ExecutorService executorService,
+        Supplier<Integer> appendMaxBufferSizeSupplier

Review Comment:
   nit: Could we move this parameter up, just after `appendLingerMs`? So the 
`append...` parameters are grouped together. The same for the assignment in the 
constructor.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -614,6 +616,11 @@ class CoordinatorContext {
          */
         int batchEpoch;
 
+        /**
+         * The cached buffer size.
+         */
+        AtomicLong cachedBufferSize;
+

Review Comment:
   nit: Could we move this just after `bufferSupplier`? Since they're related.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2851,7 +2899,6 @@ public void 
testHighWatermarkUpdateWithDeferredEventExceptions() throws Executio
         MockTimer timer = new MockTimer();
         MockPartitionWriter writer = new MockPartitionWriter();
         CoordinatorRuntimeMetrics metrics = 
mock(CoordinatorRuntimeMetrics.class);
-

Review Comment:
   nit: stray change



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -196,6 +197,11 @@ public Builder<S, U> withExecutorService(ExecutorService 
executorService) {
             return this;
         }
 
+        public Builder<S, U> withMaxBufferSizeSupplier(Supplier<Integer> 
maxBufferSizeSupplier) {
+            this.appendMaxBufferSizeSupplier = maxBufferSizeSupplier;

Review Comment:
   nit: Could we name this `withAppendMaxBufferSizeSupplier`? for consistency
   ```suggestion
           public Builder<S, U> 
withAppendMaxBufferSizeSupplier(Supplier<Integer> appendMaxBufferSizeSupplier) {
               this.appendMaxBufferSizeSupplier = appendMaxBufferSizeSupplier;
   ```



##########
docs/upgrade.html:
##########
@@ -19,16 +19,26 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>

Review Comment:
   Could we also add documentation for the new metrics to ops.html?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to