shashankhs11 commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2409117239


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1066,6 +1066,146 @@ public void testSplitAndReenqueue() throws 
ExecutionException, InterruptedExcept
         assertEquals(1, future2.get().offset());
     }
 
+    // This test confirms us that splitting a single large record
+    // creates an unsplittable batch (does not really split it)
+    // that will continue to fail with MESSAGE_TOO_LARGE,
+    // causing infinite retry loops
+    @Test
+    public void testSplitAndReenqueueWithSingleLargeRecord() throws 
ExecutionException, InterruptedException {
+        long now = time.milliseconds();
+        int smallBatchSize = 1024;
+        RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 
10 * 1024, Compression.NONE, 10);
+
+        // create a single record that is much larger than the batch size limit
+        // we are trying to mimic by send a record larger than broker's 
message.max.bytes
+        byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+        
+        // Create a buffer with enough space for the large record
+        ByteBuffer buffer = ByteBuffer.allocate(8192);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+        ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
+        
+        final AtomicInteger acked = new AtomicInteger(0);
+        Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+        // create a large batch but only with one single record
+        Future<RecordMetadata> future = batch.tryAppend(now, key, largeValue, 
Record.EMPTY_HEADERS, cb, now);
+        assertNotNull(future, "Should be able to append the large record to 
batch");
+        assertEquals(1, batch.recordCount, "Batch should contain exactly one 
record");
+        batch.close();
+        
+        // try to split and reenqueue a single large record
+        SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, 
batch, 0);
+        
+        // The below asserts tests that the single large record
+        // results in exactly one "split" batch
+        assertEquals(1, result.numSplitBatches, "Single large record should 
result in exactly one split batch");
+        assertEquals(1, result.originalRecordCount, "Original batch should 
have exactly one record");
+        assertEquals(1, result.splitBatch.recordCount, "Split batch should 
still contain exactly one record");
+        assertTrue(result.originalBatchSize > smallBatchSize, "Original batch 
should exceed batch size limit");
+        
+        // the "split" batch is still oversized and contains the same record
+        assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, 
+            "Split batch is still oversized - it cannot be split further and 
will cause an error, will retry infinitely");
+    }
+
+    // This test retries for infinite times (controlled for 5 times for 
testing)
+    // because the record can never be split further
+    @Test
+    public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord() 
throws ExecutionException, InterruptedException {
+        long now = time.milliseconds();
+        int smallBatchSize = 1024;
+        RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 
10 * 1024, Compression.NONE, 10);
+
+        // create a single record that is much larger than the batch size limit
+        // we are trying to mimic by send a record larger than broker's 
message.max.bytes
+        byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+
+        // Create a buffer with enough space for the large record
+        ByteBuffer buffer = ByteBuffer.allocate(8192);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+        ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, 
true);
+
+        final AtomicInteger acked = new AtomicInteger(0);
+        Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+        Future<RecordMetadata> future = originalBatch.tryAppend(now, key, 
largeValue, Record.EMPTY_HEADERS, cb, now);
+        assertNotNull(future, "Should be able to append the large record to 
batch");
+        assertEquals(1, originalBatch.recordCount, "Original batch should 
contain exactly one record");
+        originalBatch.close();
+
+        // controlled test case, retry behavior across multiple cycles
+        // 5 cycles for testing but mimics infinite retries in reality
+        final int maxRetryCycles = 5;
+
+        ProducerBatch currentBatch = originalBatch;
+        List<SplitAndReenqueueResult> results = new ArrayList<>();
+
+        for (int retryAttempt = 0; retryAttempt < maxRetryCycles; 
retryAttempt++) {
+            SplitAndReenqueueResult result = 
performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt);
+            results.add(result);
+
+            // Verify that each retry produces exactly 1 "split" batch (cannot 
be split further)
+            assertEquals(1, result.numSplitBatches, "Single record should 
result in exactly one split batch in retry attempt " + retryAttempt);
+            assertEquals(1, result.originalRecordCount, "Original batch should 
have exactly one record in retry attempt " + retryAttempt);
+            assertTrue(result.originalBatchSize > smallBatchSize, "Original 
batch should exceed size limit in retry attempt " + retryAttempt);
+            assertEquals(1, result.splitBatch.recordCount, "Split batch should 
still contain exactly one record in retry attempt " + retryAttempt);
+
+            // The split batch is still oversized and will fail with 
MESSAGE_TOO_LARGE again
+            assertTrue(result.splitBatch.estimatedSizeInBytes() > 
smallBatchSize,
+                "Split batch in retry " + retryAttempt + " is still oversized 
and will fail MESSAGE_TOO_LARGE again");
+
+            // the new batch must be the split batch
+            currentBatch = result.splitBatch;
+        }
+
+        // making sure that all the retry attempts were tracked
+        assertEquals(maxRetryCycles, results.size(), "Should have tracked all 
retry attempts");
+
+        // consistency across all retry cycles - each produces exactly 1 
unsplittable batch
+        for (int i = 0; i < maxRetryCycles; i++) {
+            SplitAndReenqueueResult result = results.get(i);
+            assertEquals(1, result.numSplitBatches, "Retry attempt " + i + " 
should produce exactly 1 split batch");
+            assertEquals(1, result.originalRecordCount, "Retry attempt " + i + 
" should have exactly 1 record");
+            assertTrue(result.originalBatchSize > smallBatchSize, "Retry 
attempt " + i + " batch should exceed size limit");
+        }
+    }
+
+    // here I am testing the hasRoomFor() behaviour
+    // It allows the first record no matter the size
+    // but does not allow the second record

Review Comment:
   Cleaned up in 4762a99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to