shashankhs11 commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2409117239
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1066,6 +1066,146 @@ public void testSplitAndReenqueue() throws
ExecutionException, InterruptedExcept
assertEquals(1, future2.get().offset());
}
+ // This test confirms us that splitting a single large record
+ // creates an unsplittable batch (does not really split it)
+ // that will continue to fail with MESSAGE_TOO_LARGE,
+ // causing infinite retry loops
+ @Test
+ public void testSplitAndReenqueueWithSingleLargeRecord() throws
ExecutionException, InterruptedException {
+ long now = time.milliseconds();
+ int smallBatchSize = 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize,
10 * 1024, Compression.NONE, 10);
+
+ // create a single record that is much larger than the batch size limit
+ // we are trying to mimic by send a record larger than broker's
message.max.bytes
+ byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+
+ // Create a buffer with enough space for the large record
+ ByteBuffer buffer = ByteBuffer.allocate(8192);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+ ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
+
+ final AtomicInteger acked = new AtomicInteger(0);
+ Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+ // create a large batch but only with one single record
+ Future<RecordMetadata> future = batch.tryAppend(now, key, largeValue,
Record.EMPTY_HEADERS, cb, now);
+ assertNotNull(future, "Should be able to append the large record to
batch");
+ assertEquals(1, batch.recordCount, "Batch should contain exactly one
record");
+ batch.close();
+
+ // try to split and reenqueue a single large record
+ SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum,
batch, 0);
+
+ // The below asserts tests that the single large record
+ // results in exactly one "split" batch
+ assertEquals(1, result.numSplitBatches, "Single large record should
result in exactly one split batch");
+ assertEquals(1, result.originalRecordCount, "Original batch should
have exactly one record");
+ assertEquals(1, result.splitBatch.recordCount, "Split batch should
still contain exactly one record");
+ assertTrue(result.originalBatchSize > smallBatchSize, "Original batch
should exceed batch size limit");
+
+ // the "split" batch is still oversized and contains the same record
+ assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize,
+ "Split batch is still oversized - it cannot be split further and
will cause an error, will retry infinitely");
+ }
+
+ // This test retries for infinite times (controlled for 5 times for
testing)
+ // because the record can never be split further
+ @Test
+ public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord()
throws ExecutionException, InterruptedException {
+ long now = time.milliseconds();
+ int smallBatchSize = 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize,
10 * 1024, Compression.NONE, 10);
+
+ // create a single record that is much larger than the batch size limit
+ // we are trying to mimic by send a record larger than broker's
message.max.bytes
+ byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+
+ // Create a buffer with enough space for the large record
+ ByteBuffer buffer = ByteBuffer.allocate(8192);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+ ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now,
true);
+
+ final AtomicInteger acked = new AtomicInteger(0);
+ Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+ Future<RecordMetadata> future = originalBatch.tryAppend(now, key,
largeValue, Record.EMPTY_HEADERS, cb, now);
+ assertNotNull(future, "Should be able to append the large record to
batch");
+ assertEquals(1, originalBatch.recordCount, "Original batch should
contain exactly one record");
+ originalBatch.close();
+
+ // controlled test case, retry behavior across multiple cycles
+ // 5 cycles for testing but mimics infinite retries in reality
+ final int maxRetryCycles = 5;
+
+ ProducerBatch currentBatch = originalBatch;
+ List<SplitAndReenqueueResult> results = new ArrayList<>();
+
+ for (int retryAttempt = 0; retryAttempt < maxRetryCycles;
retryAttempt++) {
+ SplitAndReenqueueResult result =
performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt);
+ results.add(result);
+
+ // Verify that each retry produces exactly 1 "split" batch (cannot
be split further)
+ assertEquals(1, result.numSplitBatches, "Single record should
result in exactly one split batch in retry attempt " + retryAttempt);
+ assertEquals(1, result.originalRecordCount, "Original batch should
have exactly one record in retry attempt " + retryAttempt);
+ assertTrue(result.originalBatchSize > smallBatchSize, "Original
batch should exceed size limit in retry attempt " + retryAttempt);
+ assertEquals(1, result.splitBatch.recordCount, "Split batch should
still contain exactly one record in retry attempt " + retryAttempt);
+
+ // The split batch is still oversized and will fail with
MESSAGE_TOO_LARGE again
+ assertTrue(result.splitBatch.estimatedSizeInBytes() >
smallBatchSize,
+ "Split batch in retry " + retryAttempt + " is still oversized
and will fail MESSAGE_TOO_LARGE again");
+
+ // the new batch must be the split batch
+ currentBatch = result.splitBatch;
+ }
+
+ // making sure that all the retry attempts were tracked
+ assertEquals(maxRetryCycles, results.size(), "Should have tracked all
retry attempts");
+
+ // consistency across all retry cycles - each produces exactly 1
unsplittable batch
+ for (int i = 0; i < maxRetryCycles; i++) {
+ SplitAndReenqueueResult result = results.get(i);
+ assertEquals(1, result.numSplitBatches, "Retry attempt " + i + "
should produce exactly 1 split batch");
+ assertEquals(1, result.originalRecordCount, "Retry attempt " + i +
" should have exactly 1 record");
+ assertTrue(result.originalBatchSize > smallBatchSize, "Retry
attempt " + i + " batch should exceed size limit");
+ }
+ }
+
+ // here I am testing the hasRoomFor() behaviour
+ // It allows the first record no matter the size
+ // but does not allow the second record
Review Comment:
Cleaned up in 4762a99
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]