This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 6243bf7d4e JAMES-3977 Test for backpressure
6243bf7d4e is described below

commit 6243bf7d4e01d1102eb3728ba3c5c3486b3262d9
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Mar 18 12:55:10 2024 +0100

    JAMES-3977 Test for backpressure
    
     - Rely on mock to enforce that backpressure is applied
     - Ensure that when read FETCH command proceed
---
 .../james/imapserver/netty/IMAPServerTest.java     | 47 ++++++++++++++++++++--
 1 file changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 2a181282aa..7222755b97 100644
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -29,6 +29,11 @@ import static 
org.apache.james.mailbox.MessageManager.MailboxMetaData.RecentMode
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -44,6 +49,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.IntStream;
 
@@ -107,6 +113,8 @@ import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpResponse;
@@ -127,6 +135,7 @@ import io.netty.handler.codec.compression.ZlibWrapper;
 import io.netty.handler.ssl.SslContextBuilder;
 import nl.altindag.ssl.exception.GenericKeyStoreException;
 import nl.altindag.ssl.pem.exception.PrivateKeyParseException;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.netty.Connection;
@@ -145,6 +154,7 @@ class IMAPServerTest {
 
     @RegisterExtension
     public TestIMAPClient testIMAPClient = new TestIMAPClient();
+    private InMemoryMailboxManager mailboxManager;
 
     private IMAPServer 
createImapServer(HierarchicalConfiguration<ImmutableNode> config,
                                         InMemoryIntegrationResources 
inMemoryIntegrationResources) throws Exception {
@@ -152,15 +162,16 @@ class IMAPServerTest {
 
         RecordingMetricFactory metricFactory = new RecordingMetricFactory();
         Set<ConnectionCheck> connectionChecks = defaultConnectionChecks();
+        mailboxManager = spy(memoryIntegrationResources.getMailboxManager());
         IMAPServer imapServer = new IMAPServer(
             DefaultImapDecoderFactory.createDecoder(),
             new DefaultImapEncoderFactory().buildImapEncoder(),
             DefaultImapProcessorFactory.createXListSupportingProcessor(
-                memoryIntegrationResources.getMailboxManager(),
+                mailboxManager,
                 memoryIntegrationResources.getEventBus(),
-                new 
StoreSubscriptionManager(memoryIntegrationResources.getMailboxManager().getMapperFactory(),
-                    
memoryIntegrationResources.getMailboxManager().getMapperFactory(),
-                    
memoryIntegrationResources.getMailboxManager().getEventBus()),
+                new StoreSubscriptionManager(mailboxManager.getMapperFactory(),
+                    mailboxManager.getMapperFactory(),
+                    mailboxManager.getEventBus()),
                 null,
                 memoryIntegrationResources.getQuotaManager(),
                 memoryIntegrationResources.getQuotaRootResolver(),
@@ -2501,6 +2512,34 @@ class IMAPServerTest {
             // Then the FETCH
             readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH 
completed."));
         }
+
+        @Test
+        void fetchShouldBackPressureWhenNoRead() throws Exception {
+            String msgIn = "MIME-Version: 1.0\r\n\r\nCONTENT\r\n\r\n" + 
"0123456789\r\n0123456789\r\n0123456789\r\n".repeat(1024);
+            IntStream.range(0, 500)
+                .forEach(Throwing.intConsumer(i -> 
inbox.appendMessage(MessageManager.AppendCommand.builder()
+                    .build(msgIn), mailboxSession)));
+            AtomicInteger loaded = new AtomicInteger(0);
+            MessageManager inboxSpy = spy(inbox);
+            
doReturn(Mono.just(inboxSpy)).when(mailboxManager).getMailboxReactive(eq(MailboxPath.inbox(USER)),
 any());
+            
doReturn(Mono.just(inboxSpy)).when(mailboxManager).getMailboxReactive(eq(inbox.getMailboxEntity().getMailboxId()),
 any());
+            doAnswer((Answer<Object>) invocationOnMock -> 
Flux.from(inbox.getMessagesReactive(invocationOnMock.getArgument(0), 
invocationOnMock.getArgument(1), invocationOnMock.getArgument(2)))
+                .doOnNext(any -> 
loaded.incrementAndGet())).when(inboxSpy).getMessagesReactive(any(), any(), 
any());
+
+            clientConnection.write(ByteBuffer.wrap(String.format("a0 LOGIN %s 
%s\r\n", USER.asString(), USER_PASS).getBytes(StandardCharsets.UTF_8)));
+            readBytes(clientConnection);
+
+            clientConnection.write(ByteBuffer.wrap(("A1 SELECT 
INBOX\r\n").getBytes(StandardCharsets.UTF_8)));
+            // Select completes first
+            readStringUntil(clientConnection, s -> s.contains("A1 OK 
[READ-WRITE] SELECT completed."));
+            clientConnection.write(ByteBuffer.wrap(("A2 UID FETCH 1:500 
(BODY[])\r\n").getBytes(StandardCharsets.UTF_8)));
+
+            Thread.sleep(1000);
+
+            assertThat(loaded.get()).isLessThan(500);
+            readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH 
completed."));
+            assertThat(loaded.get()).isEqualTo(500);
+        }
     }
 
     private byte[] readBytes(SocketChannel channel) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to