alievmirza commented on code in PR #7512:
URL: https://github.com/apache/ignite-3/pull/7512#discussion_r2759796420


##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer> 
peers) {
         );
     }
 
+    /**
+     * Test that OverloadException is thrown when byte size limit is exceeded.
+     */
+    @ParameterizedTest
+    @EnumSource(ApplyTaskMode.class)
+    public void testApplyQueueByteSizeThrottlingExceedsLimit(ApplyTaskMode 
mode) throws Exception {
+        RaftOptions raftOptions = new RaftOptions();
+        // Set limit to 300 KB
+        raftOptions.setMaxApplyQueueByteSize(300 * 1024);
+
+        Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+        int numTasks = 50;
+        AtomicInteger overloadCount = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(numTasks);
+
+        List<Task> tasks = new ArrayList<>(numTasks);
+        for (int i = 0; i < numTasks; i++) {
+            byte[] bytes = new byte[100 * 1024]; // 100 KB each
+            ByteBuffer data = ByteBuffer.wrap(bytes);
+            Task task = new Task(data, new JoinableClosure(status -> {
+                if (!status.isOk()) {
+                    assertEquals(RaftError.EBUSY, status.getRaftError());
+                    assertTrue(status.getErrorMsg().contains("Node is busy, 
apply queue byte size limit exceeded"));
+                    overloadCount.incrementAndGet();
+                }
+                latch.countDown();
+            }));
+            tasks.add(task);
+        }
+
+        for (Task task : tasks) {
+            try {
+                node.apply(task);
+            } catch (OverloadException e) {
+                assertTrue(e.getMessage().contains("Node is busy, apply queue 
byte size limit exceeded"));
+                overloadCount.incrementAndGet();
+                latch.countDown();
+            }
+        }
+
+        waitLatch(latch);
+
+        assertTrue(overloadCount.get() > 0, "Expected some tasks to be 
rejected due to byte size limit");
+
+        assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size 
should be 0 after all tasks are processed");
+    }
+
+    /**
+     * Test that tasks succeed when under the byte size limit.
+     */
+    @ParameterizedTest
+    @EnumSource(ApplyTaskMode.class)
+    public void testApplyQueueByteSizeThrottlingUnderLimit(ApplyTaskMode mode) 
throws Exception {
+        RaftOptions raftOptions = new RaftOptions();
+        // Set limit to 10 MB
+        raftOptions.setMaxApplyQueueByteSize(10 * 1024 * 1024);
+
+        Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+        CountDownLatch latch = new CountDownLatch(50);
+        AtomicInteger successCount = new AtomicInteger(0);
+
+        for (int i = 0; i < 50; i++) {
+            byte[] bytes = new byte[10 * 1024]; // 10 KB
+            ByteBuffer data = ByteBuffer.wrap(bytes);
+            Task task = new Task(data, new JoinableClosure(status -> {
+                assertTrue(status.isOk(), "Task should succeed when under 
limit: " + status);
+                successCount.incrementAndGet();
+                latch.countDown();
+            }));
+            node.apply(task);
+        }
+
+        waitLatch(latch);
+
+        assertEquals(50, successCount.get());
+
+        assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size 
should be 0 after all tasks are processed");

Review Comment:
   What is happening under the hood:
   
   1. we call `decrementApplyQueueByteSize();`
   2. then `executeApplyingTasks(this.tasks);`
   3. then `LogEntryAndClosureHandler#reset();`
   
   let's describe every step: 
   
   1. here we actually decrement our tracker with the sizes of tasks.
   2. here we pass all done closure to 
`this.closureQueue.appendPendingClosure`, which will be called when 
`org.apache.ignite.raft.jraft.core.FSMCallerImpl#doCommitted` called.
   3. Here we just call reset() (just nullify all references) for all tasks and 
call tasks.clear();
   
   So, I want to say that after `waitLatch(latch);` we can guarantee that 
`applyQueueByteSize` is decreased for every task that has passed through 
`NodeImpl#applyQueue` and propagated further to RAFT flow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to