Yunyung commented on code in PR #20040:
URL: https://github.com/apache/kafka/pull/20040#discussion_r2194771745


##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2937,9 +2939,206 @@ public void testAppendRecordBatchSize() {
         assertFalse(write1.isCompletedExceptionally());
 
         int batchSize = writer.entries(TP).get(0).sizeInBytes();
-        assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+        assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < 
maxBatchSize);
     }
 
+    @Test
+    public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024) // 1MB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate a record larger than the maxBatchSize.
+        List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(largeRecords, "response1", null, 
true, false)
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the initial small one, not the large buffer.
+        assertEquals(INITIAL_BUFFER_SIZE, 
ctx.bufferSupplier.get(1).capacity());
+    }
+
+    @Test
+    public void 
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024 * 1024) // 1GB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the expanded buffer.
+        assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+    }
+
+    @Test
+    public void 
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+        MockTimer timer = new MockTimer();
+        var mockWriter = new InMemoryPartitionWriter(false) {
+            private LogConfig config = new LogConfig(Map.of(
+                TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 
1024) // 1MB
+            ));
+
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return config;
+            }
+
+            public void updateConfig(LogConfig newConfig) {
+                this.config = newConfig;
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+

Review Comment:
   ditto
   
   ```suggestion
           int batchSize = writer.entries(TP).get(0).sizeInBytes();
           assertTrue(records <= batchSize);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to