alievmirza commented on code in PR #7512:
URL: https://github.com/apache/ignite-3/pull/7512#discussion_r2759909229


##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer> 
peers) {
         );
     }
 
+    /**
+     * Test that OverloadException is thrown when byte size limit is exceeded.
+     */
+    @ParameterizedTest
+    @EnumSource(ApplyTaskMode.class)
+    public void testApplyQueueByteSizeThrottlingExceedsLimit(ApplyTaskMode 
mode) throws Exception {
+        RaftOptions raftOptions = new RaftOptions();
+        // Set limit to 300 KB
+        raftOptions.setMaxApplyQueueByteSize(300 * 1024);
+
+        Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+        int numTasks = 50;
+        AtomicInteger overloadCount = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(numTasks);
+
+        List<Task> tasks = new ArrayList<>(numTasks);
+        for (int i = 0; i < numTasks; i++) {
+            byte[] bytes = new byte[100 * 1024]; // 100 KB each
+            ByteBuffer data = ByteBuffer.wrap(bytes);
+            Task task = new Task(data, new JoinableClosure(status -> {
+                if (!status.isOk()) {
+                    assertEquals(RaftError.EBUSY, status.getRaftError());
+                    assertTrue(status.getErrorMsg().contains("Node is busy, 
apply queue byte size limit exceeded"));
+                    overloadCount.incrementAndGet();
+                }
+                latch.countDown();

Review Comment:
   I've changed a bit solution, exception is thrown only when done closure is 
not set to Task. I've made that to conform the existing code and case when in 
non-blocking scenario we cannot publish a task.
   
   ```
                   if (!this.applyQueue.tryPublishEvent(translator)) {
                       String errorMsg = "Node is busy, has too many tasks, 
queue is full and bufferSize="+ this.applyQueue.getBufferSize();
                       
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), 
new Status(RaftError.EBUSY, errorMsg));
                       LOG.warn("Node {} applyQueue is overload.", getNodeId());
                       this.metrics.recordTimes("apply-task-overload-times", 1);
                       if (task.getDone() == null) {
                           throw new OverloadException(errorMsg);
                       }
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to