jayteej commented on code in PR #20358:
URL: https://github.com/apache/kafka/pull/20358#discussion_r2287279126


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1665,4 +1669,125 @@ int randomPartition() {
             return mockRandom == null ? super.randomPartition() : 
mockRandom.getAndIncrement();
         }
     }
+
+    /**
+     * This test verifies that RecordAccumulator's batch splitting 
functionality
+     * correctly handles oversized batches
+     * by splitting them down to individual records when necessary. It ensures 
that:
+     * 1. The splitting process can reduce batches to single-record size
+     * 2. The process does not enter infinite recursion loops
+     * 3. No records are lost or duplicated during splitting
+     * 4. The correct batch state is maintained throughout the process
+     */
+    @Test
+    public void testSplitAndReenqueuePreventInfiniteRecursion() throws 
InterruptedException {
+        // Initialize test environment with a large batch size
+        long now = time.milliseconds();
+        int batchSize = 1024 * 1024; // 1MB batch size
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize, 10 * 
batchSize, Compression.gzip().build(),
+                10);
+
+        // Create a large producer batch manually (bypassing the accumulator's 
normal
+        // append process)
+        ByteBuffer buffer = ByteBuffer.allocate(batchSize);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+        ProducerBatch bigBatch = new ProducerBatch(tp1, builder, now, true);
+
+        // Populate the batch with multiple records (100 records of 1KB each)
+        byte[] largeValue = new byte[1024]; // Each record is 1KB
+        for (int i = 0; i < 100; i++) {
+            ByteBuffer keyBytes = ByteBuffer.allocate(4);
+            keyBytes.putInt(i); // Use the loop counter as the key for 
verification later
+            FutureRecordMetadata result = 
bigBatch.tryAppend(time.milliseconds(), keyBytes.array(), largeValue,
+                    Record.EMPTY_HEADERS, null, time.milliseconds());
+            assertNotNull(result);
+        }
+        bigBatch.close();
+
+        time.sleep(101L); // Ensure the batch has time to become ready for 
processing
+
+        // Add the batch to the accumulator for splitting
+        accum.reenqueue(bigBatch, time.milliseconds());
+
+        // Iteratively split batches until we find single-record batches
+        // This section tests the core batch splitting functionality
+        int splitOperations = 0;
+        int maxSplitOperations = 100; // Safety limit to prevent infinite 
recursion
+        boolean foundSingleRecordBatch = false;
+
+        // Use a comparator that puts the batch with the most records first
+        Comparator<ProducerBatch> reverseComparator = (batch1, batch2) -> 
Integer.compare(batch2.recordCount,
+                batch1.recordCount);
+
+        while (splitOperations < maxSplitOperations || 
!foundSingleRecordBatch) {

Review Comment:
   Should this be &&



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to