mingyen066 commented on code in PR #20040: URL: https://github.com/apache/kafka/pull/20040#discussion_r2180892266
########## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java: ########## @@ -2937,9 +2939,208 @@ public void testAppendRecordBatchSize() { assertFalse(write1.isCompletedExceptionally()); int batchSize = writer.entries(TP).get(0).sizeInBytes(); - assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); + assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < maxBatchSize); } + @Test + public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate a record larger than the maxBatchSize. + List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024)); + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(largeRecords, "response1", null, true, false) + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + // Verify that the next buffer retrieved from the bufferSupplier is the initial small one, not the large buffer. + assertEquals(INITIAL_BUFFER_SIZE, ctx.bufferSupplier.get(1).capacity()); + } + + @Test + public void testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024) // 1GB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize + List<String> records = new ArrayList<>(); + for (int i = 0; i < 1000000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + // Verify that the next buffer retrieved from the bufferSupplier is the expanded buffer. + assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE); + } + + @Test + public void testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() { + MockTimer timer = new MockTimer(); + var mockWriter = new InMemoryPartitionWriter(false) { + private LogConfig config; + + { + config = new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + } + + @Override + public LogConfig config(TopicPartition tp) { + return config; + } + + public void updateConfig(LogConfig newConfig) { + this.config = newConfig; + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + List<String> records = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1); + assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity()); + ctx.bufferSupplier.release(cachedBuffer); + + // Reduce max message size below initial buffer size. + mockWriter.updateConfig(new LogConfig( + Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(INITIAL_BUFFER_SIZE - 66)))); + + // Write #2. + CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response2") + ); + assertFalse(write2.isCompletedExceptionally()); + + // Verify that there is no cached buffer. + assertEquals(1, ctx.bufferSupplier.get(1).capacity()); + + // Write #3. Review Comment: I replaced the previous assertion with `assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity())` to ensure that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org