ibessonov commented on code in PR #7512:
URL: https://github.com/apache/ignite-3/pull/7512#discussion_r2758673644
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -315,6 +322,21 @@ private void reset() {
}
this.tasks.clear();
}
+
+ private void decrementApplyQueueByteSize() {
+ long maxQueueSize =
NodeImpl.this.raftOptions.getMaxApplyQueueByteSize();
+
+ if (maxQueueSize > 0) {
+ long totalSize = 0;
+ for (LogEntryAndClosure task : tasks) {
+ ByteBuffer data = task.entry.getData();
+ totalSize += data.remaining();
+ }
Review Comment:
So, you're telling me that executing each task is `O(number of tasks)` now.
I can't accept that, it should be `O(1)` in those terms. Can't we get the
`totalSize` from `LogEntryAndClosure event` or something?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -2127,6 +2149,25 @@ public void apply(final Task task) {
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(task, "Null task");
+ Requires.requireNonNull(task.getData(), "Null data");
+
+ int taskDataSize = task.getData().remaining();
+
+ long maxQueueSize = this.raftOptions.getMaxApplyQueueByteSize();
+
+ if (maxQueueSize > 0) {
+ long currentSize = this.applyQueueByteSize.sum();
+ if (currentSize + taskDataSize > maxQueueSize) {
Review Comment:
What are we going to do with an obvious race between this check and an
actual `this.applyQueueByteSize.add(taskDataSize);`?
##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer>
peers) {
);
}
+ /**
+ * Test that OverloadException is thrown when byte size limit is exceeded.
Review Comment:
```suggestion
* Test that {@link OverloadException} is thrown when byte size limit is
exceeded.
```
##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer>
peers) {
);
}
+ /**
+ * Test that OverloadException is thrown when byte size limit is exceeded.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeThrottlingExceedsLimit(ApplyTaskMode
mode) throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 300 KB
+ raftOptions.setMaxApplyQueueByteSize(300 * 1024);
+
+ Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+ int numTasks = 50;
+ AtomicInteger overloadCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(numTasks);
+
+ List<Task> tasks = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ byte[] bytes = new byte[100 * 1024]; // 100 KB each
+ ByteBuffer data = ByteBuffer.wrap(bytes);
+ Task task = new Task(data, new JoinableClosure(status -> {
+ if (!status.isOk()) {
+ assertEquals(RaftError.EBUSY, status.getRaftError());
+ assertTrue(status.getErrorMsg().contains("Node is busy,
apply queue byte size limit exceeded"));
+ overloadCount.incrementAndGet();
+ }
+ latch.countDown();
Review Comment:
Could you please add some explanatory comments to the test? Who do we have
two places that have the same logic here? Does this depend on `mode`?
##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4957,6 +5083,45 @@ private void triggerLeaderSnapshot(TestCluster cluster,
Node leader) throws Inte
triggerLeaderSnapshot(cluster, leader, 1);
}
+ /**
+ * Helper method to set up a single-node cluster with custom RaftOptions
for testing.
+ *
+ * @param raftOptions The RaftOptions to use for the node
+ * @return The started node (already elected as leader)
+ */
+ private Node setupSingleNodeClusterWithRaftOptions(RaftOptions
raftOptions, ApplyTaskMode mode) {
+ TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
+
+ NodeOptions nodeOptions = createNodeOptions(0);
+ nodeOptions.setApplyTaskMode(mode);
+ nodeOptions.setRaftOptions(raftOptions);
+ MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
+ nodeOptions.setFsm(fsm);
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setInitialConf(new
Configuration(Collections.singletonList(peer.getPeerId())));
+
+ RaftGroupService service = createService("unittest", peer,
nodeOptions, List.of());
+ Node node = service.start();
+
+ await().until(node::isLeader);
+
+ return node;
+ }
+
+ /**
+ * Helper method to get the current apply queue byte size for assertions.
+ *
+ * @param node The node to check
+ * @return The current applyQueueByteSize value
+ */
+ private static long getApplyQueueByteSize(Node node) throws Exception {
+ Field field = NodeImpl.class.getDeclaredField("applyQueueByteSize");
+ field.setAccessible(true);
+ LongAdder counter = (LongAdder) field.get(node);
Review Comment:
Please use
`org.apache.ignite.internal.testframework.IgniteTestUtils#getFieldValue`.
Why don't we have a getter instead?
##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer>
peers) {
);
}
+ /**
+ * Test that OverloadException is thrown when byte size limit is exceeded.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeThrottlingExceedsLimit(ApplyTaskMode
mode) throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 300 KB
+ raftOptions.setMaxApplyQueueByteSize(300 * 1024);
+
+ Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+ int numTasks = 50;
+ AtomicInteger overloadCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(numTasks);
+
+ List<Task> tasks = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ byte[] bytes = new byte[100 * 1024]; // 100 KB each
+ ByteBuffer data = ByteBuffer.wrap(bytes);
+ Task task = new Task(data, new JoinableClosure(status -> {
+ if (!status.isOk()) {
+ assertEquals(RaftError.EBUSY, status.getRaftError());
+ assertTrue(status.getErrorMsg().contains("Node is busy,
apply queue byte size limit exceeded"));
+ overloadCount.incrementAndGet();
+ }
+ latch.countDown();
+ }));
+ tasks.add(task);
+ }
+
+ for (Task task : tasks) {
+ try {
+ node.apply(task);
+ } catch (OverloadException e) {
+ assertTrue(e.getMessage().contains("Node is busy, apply queue
byte size limit exceeded"));
+ overloadCount.incrementAndGet();
+ latch.countDown();
+ }
+ }
+
+ waitLatch(latch);
+
+ assertTrue(overloadCount.get() > 0, "Expected some tasks to be
rejected due to byte size limit");
+
+ assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size
should be 0 after all tasks are processed");
+ }
+
+ /**
+ * Test that tasks succeed when under the byte size limit.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeThrottlingUnderLimit(ApplyTaskMode mode)
throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 10 MB
+ raftOptions.setMaxApplyQueueByteSize(10 * 1024 * 1024);
+
+ Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+ CountDownLatch latch = new CountDownLatch(50);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ for (int i = 0; i < 50; i++) {
+ byte[] bytes = new byte[10 * 1024]; // 10 KB
+ ByteBuffer data = ByteBuffer.wrap(bytes);
+ Task task = new Task(data, new JoinableClosure(status -> {
+ assertTrue(status.isOk(), "Task should succeed when under
limit: " + status);
+ successCount.incrementAndGet();
+ latch.countDown();
+ }));
+ node.apply(task);
+ }
+
+ waitLatch(latch);
+
+ assertEquals(50, successCount.get());
+
+ assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size
should be 0 after all tasks are processed");
Review Comment:
How do you guarantee this this condition won't cause the test to become
flaky?
There's no way that `latch` enforces the decreasing of the size. The
decreasing should happen strictly after, which is a data race if I'm correct.
Please re-check. Same question applied to the rest of your tests.
##########
modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java:
##########
@@ -4729,6 +4735,126 @@ private TestCluster createClusterOf(List<TestPeer>
peers) {
);
}
+ /**
+ * Test that OverloadException is thrown when byte size limit is exceeded.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeThrottlingExceedsLimit(ApplyTaskMode
mode) throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 300 KB
+ raftOptions.setMaxApplyQueueByteSize(300 * 1024);
+
+ Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+ int numTasks = 50;
+ AtomicInteger overloadCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(numTasks);
+
+ List<Task> tasks = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ byte[] bytes = new byte[100 * 1024]; // 100 KB each
+ ByteBuffer data = ByteBuffer.wrap(bytes);
+ Task task = new Task(data, new JoinableClosure(status -> {
+ if (!status.isOk()) {
+ assertEquals(RaftError.EBUSY, status.getRaftError());
+ assertTrue(status.getErrorMsg().contains("Node is busy,
apply queue byte size limit exceeded"));
+ overloadCount.incrementAndGet();
+ }
+ latch.countDown();
+ }));
+ tasks.add(task);
+ }
+
+ for (Task task : tasks) {
+ try {
+ node.apply(task);
+ } catch (OverloadException e) {
+ assertTrue(e.getMessage().contains("Node is busy, apply queue
byte size limit exceeded"));
+ overloadCount.incrementAndGet();
+ latch.countDown();
+ }
+ }
+
+ waitLatch(latch);
+
+ assertTrue(overloadCount.get() > 0, "Expected some tasks to be
rejected due to byte size limit");
+
+ assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size
should be 0 after all tasks are processed");
+ }
+
+ /**
+ * Test that tasks succeed when under the byte size limit.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeThrottlingUnderLimit(ApplyTaskMode mode)
throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 10 MB
+ raftOptions.setMaxApplyQueueByteSize(10 * 1024 * 1024);
+
+ Node node = setupSingleNodeClusterWithRaftOptions(raftOptions, mode);
+
+ CountDownLatch latch = new CountDownLatch(50);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ for (int i = 0; i < 50; i++) {
+ byte[] bytes = new byte[10 * 1024]; // 10 KB
+ ByteBuffer data = ByteBuffer.wrap(bytes);
+ Task task = new Task(data, new JoinableClosure(status -> {
+ assertTrue(status.isOk(), "Task should succeed when under
limit: " + status);
+ successCount.incrementAndGet();
+ latch.countDown();
+ }));
+ node.apply(task);
+ }
+
+ waitLatch(latch);
+
+ assertEquals(50, successCount.get());
+
+ assertEquals(0, getApplyQueueByteSize(node), "Apply queue byte size
should be 0 after all tasks are processed");
+ }
+
+ /**
+ * Test that byte size counter is properly decremented when tasks are
processed.
+ */
+ @ParameterizedTest
+ @EnumSource(ApplyTaskMode.class)
+ public void testApplyQueueByteSizeCounterDecrements(ApplyTaskMode mode)
throws Exception {
+ RaftOptions raftOptions = new RaftOptions();
+ // Set limit to 2 MB, but apply overall ~3 MB of tasks
+ raftOptions.setMaxApplyQueueByteSize(2 * 1024 * 1024);
+ raftOptions.setApplyBatch(10);
Review Comment:
`10` is used 3 times here, I suppose it would make sense to extract it into
a variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]