quantranhong1999 commented on code in PR #1854:
URL: https://github.com/apache/james-project/pull/1854#discussion_r1426495110


##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
         // And the task is not executed
         assertThat(executed.get()).isFalse();
     }
+    @Test
+    void throttleShouldRecoverFromABurst() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        Thread.sleep(500);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();
+    }
+    @Test
+    void throttleShouldHandleDisposal() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Disposable subscribe1 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe2 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe3 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe4 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe5 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe6 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe7 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe8 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        subscribe1.dispose();
+        subscribe2.dispose();
+        subscribe3.dispose();
+        subscribe4.dispose();
+        subscribe5.dispose();
+        subscribe6.dispose();
+        subscribe7.dispose();
+        subscribe8.dispose();
+
+        Thread.sleep(100);
+
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();
+    }
+    @RepeatedTest(10)
+    void throttleShouldBeConcurrentFriendly() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        ConcurrentTestRunner.builder()
+            .operation((a, b) -> 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE))
+                .onErrorResume(ReactiveThrottler.RejectedException.class, e -> 
Mono.empty())
+                .block())
+            .threadCount(20)
+            .operationCount(5)
+            .runSuccessfullyWithin(Duration.ofSeconds(10));
+
+        Thread.sleep(100);
+
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();

Review Comment:
   ```suggestion
           // And the task is executed
           assertThat(executed.get()).isTrue();
   ```



##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
         // And the task is not executed
         assertThat(executed.get()).isFalse();
     }
+    @Test
+    void throttleShouldRecoverFromABurst() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        Thread.sleep(500);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();
+    }
+    @Test
+    void throttleShouldHandleDisposal() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Disposable subscribe1 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe2 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe3 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe4 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe5 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe6 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe7 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe8 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        subscribe1.dispose();
+        subscribe2.dispose();
+        subscribe3.dispose();
+        subscribe4.dispose();
+        subscribe5.dispose();
+        subscribe6.dispose();
+        subscribe7.dispose();
+        subscribe8.dispose();
+
+        Thread.sleep(100);
+
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();

Review Comment:
   ```suggestion
           // And the task is executed
           assertThat(executed.get()).isTrue();
   ```



##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
         // And the task is not executed
         assertThat(executed.get()).isFalse();
     }
+    @Test
+    void throttleShouldRecoverFromABurst() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        Thread.sleep(500);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is not executed
+        assertThat(executed.get()).isTrue();

Review Comment:
   ```suggestion
           // And the task is executed
           assertThat(executed.get()).isTrue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to