This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch quorum-queue-fix-3.7
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 97bd7e4ab9095b4a588f721c865c57526274c848
Author: Rene Cordier <[email protected]>
AuthorDate: Wed Jul 5 10:09:51 2023 +0700

    Revert "JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we 
c… (#1609) (#1616)"
    
    This reverts commit f44d1efc37c26cfabfdc9dfe482698149d5caa78.
---
 .../backends/rabbitmq/RabbitMQManagementAPI.java   |  3 --
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  2 +-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  | 13 +----
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  4 +-
 .../queue/rabbitmq/view/api/MailQueueView.java     |  3 +-
 .../view/cassandra/CassandraMailQueueView.java     | 12 +++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 58 ++++++++--------------
 7 files changed, 34 insertions(+), 61 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
index 826925ef1e..5661a4d314 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
@@ -441,9 +441,6 @@ public interface RabbitMQManagementAPI {
     @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = 
false)
     void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
 
-    @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", 
decodeSlash = false)
-    void purgeQueue(@Param("vhost") String vhost, @Param("name") String name);
-
     @RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", 
decodeSlash = false)
     List<BindingSource> listBindings(@Param("vhost") String vhost, 
@Param("name") String name);
 
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 671400a06d..573b7a2497 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -140,7 +140,7 @@ class Dequeuer {
             if (success) {
                 dequeueMetric.increment();
                 response.ack();
-                
Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(),
 mailWithEnqueueId.getBlobIds()))).block();
+                
mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(),
 mailWithEnqueueId.getBlobIds()));
             } else {
                 response.nack(REQUEUE);
             }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index b6d97b74fc..b72139c942 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -25,7 +25,6 @@ import static 
org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
 import java.time.Duration;
-import java.util.function.Function;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -35,7 +34,6 @@ import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
@@ -81,20 +79,11 @@ class Enqueuer {
                 return Flux.mergeDelayError(2,
                         mailQueueView.storeMail(enqueuedItem),
                         publishReferenceToRabbit(mailReference))
-                        .then()
-                        .onErrorResume(cleanupMailQueueView(enqueueId, 
mailReference));
+                        .then();
             }).sneakyThrow())
             .thenEmpty(Mono.fromRunnable(enqueueMetric::increment));
     }
 
-    private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId 
enqueueId, MailReference mailReference) {
-        return (Throwable e) -> {
-            DeleteCondition.WithEnqueueId deleteCondition = 
DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId());
-            return Mono.from(mailQueueView.delete(deleteCondition))
-                    .thenReturn(Mono.<Void>error(e));
-        };
-    }
-
     Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView 
item) {
         Mail mail = item.getMail();
         return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), 
mail, item.getEnqueuedPartsId()))
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index f311d7bd9a..602e679c33 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -116,12 +116,12 @@ public class RabbitMQMailQueue implements 
ManageableMailQueue {
 
     @Override
     public long clear() {
-        return Mono.from(mailQueueView.delete(DeleteCondition.all())).block();
+        return mailQueueView.delete(DeleteCondition.all());
     }
 
     @Override
     public long remove(Type type, String value) {
-        return Mono.from(mailQueueView.delete(DeleteCondition.from(type, 
value))).block();
+        return mailQueueView.delete(DeleteCondition.from(type, value));
     }
 
     @Override
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 7afcf99fc7..921a08bdcd 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -25,7 +25,6 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
-import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -40,7 +39,7 @@ public interface MailQueueView<V extends 
ManageableMailQueue.MailQueueItemView>
 
     Mono<Void> storeMail(EnqueuedItem enqueuedItem);
 
-    Publisher<Long> delete(DeleteCondition deleteCondition);
+    long delete(DeleteCondition deleteCondition);
 
     Mono<Boolean> isPresent(EnqueueId id);
 
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 83270db79b..e76df88fa5 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -127,23 +127,25 @@ public class CassandraMailQueueView implements 
MailQueueView<CassandraMailQueueB
     }
 
     @Override
-    public Mono<Long> delete(DeleteCondition deleteCondition) {
+    public long delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
             DeleteCondition.WithEnqueueId enqueueIdCondition = 
(DeleteCondition.WithEnqueueId) deleteCondition;
-            return delete(enqueueIdCondition.getEnqueueId(), 
enqueueIdCondition.getBlobIds())
-                .thenReturn(1L);
+            delete(enqueueIdCondition.getEnqueueId(), 
enqueueIdCondition.getBlobIds()).block();
+            return 1L;
         }
         return browseThenDelete(deleteCondition);
     }
 
-    private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) {
+    private long browseThenDelete(DeleteCondition deleteCondition) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(deleteCondition::shouldBeDeleted)
             .flatMap(mailReference -> 
cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), 
mailQueueName)
                 
.then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), 
DELETION_CONCURRENCY)
             .count()
-            .doOnNext(ignored -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
+            .doOnNext(ignored -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+            .subscribeOn(Schedulers.elastic())
+            .block();
     }
 
     private Mono<Void> delete(EnqueueId enqueueId,
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 8fa34fa0d2..f291b1f46d 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -31,7 +31,6 @@ import static 
org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
 import static org.apache.mailet.base.MailAddressFixture.SENDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.awaitility.Awaitility.await;
 import static org.awaitility.Durations.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
@@ -41,7 +40,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -93,7 +91,6 @@ import org.mockito.ArgumentCaptor;
 
 import com.github.fge.lambdas.Throwing;
 
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -379,12 +376,7 @@ class RabbitMQMailQueueTest {
             String name1 = "myMail1";
             String name2 = "myMail2";
             String name3 = "myMail3";
-
-            List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>();
-            Flux.from(getMailQueue().deQueue())
-                    .doOnNext(receivedItem::add)
-                    .subscribe();
-
+            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
             getMailQueue().enQueue(defaultMail()
                 .name(name1)
                 .build());
@@ -407,16 +399,19 @@ class RabbitMQMailQueueTest {
                 .name(name3)
                 .build());
 
-            await().atMost(Duration.ofSeconds(10))
-                    .untilAsserted(() -> assertThat(receivedItem)
-                            .extracting(item -> item.getMail().getName())
-                            .contains(name1, name3));
+            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10));
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .contains(name1, name3);
         }
 
         @Test
         void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws 
Exception {
             String name = "myMail";
 
+            rabbitMQExtension.getRabbitMQ().pause();
+            Thread.sleep(2000);
 
             try {
                 getMailQueue().enQueue(defaultMail()
@@ -425,7 +420,8 @@ class RabbitMQMailQueueTest {
             } catch (Exception e) {
                 // Ignore
             }
-            rabbitMQExtension.managementAPI().purgeQueue("/", 
"JamesMailQueue-workqueue-spool");
+            rabbitMQExtension.getRabbitMQ().unpause();
+            Thread.sleep(100);
 
             getMailQueue().republishNotProcessedMails(clock.instant().plus(30, 
ChronoUnit.MINUTES)).blockLast();
 
@@ -646,25 +642,14 @@ class RabbitMQMailQueueTest {
         }
 
         private void dequeueMails(int times) {
-            AtomicInteger counter = new AtomicInteger(0);
-            Disposable disposable = Flux.from(getManageableMailQueue()
-                            .deQueue())
-                    .concatMap(mailQueueItem -> Mono.fromCallable(() -> {
-                        if (counter.getAndIncrement() < times) {
-                            mailQueueItem.done(true);
-                            return mailQueueItem;
-                        } else {
-                            mailQueueItem.done(false);
-                            return null;
-                        }
-                    }).subscribeOn(Schedulers.elastic()))
-                    .subscribe();
-
-            try {
-                await().untilAsserted(() -> 
assertThat(counter.get()).isGreaterThanOrEqualTo(times));
-            } finally {
-                disposable.dispose();
-            }
+            Flux.from(getManageableMailQueue()
+                .deQueue())
+                .take(times)
+                .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
+                    mailQueueItem.done(true);
+                    return mailQueueItem;
+                }))
+                .blockLast();
         }
 
         @Test
@@ -776,7 +761,7 @@ class RabbitMQMailQueueTest {
                 .doOnNext(Throwing.consumer(item -> item.done(true)))
                 .subscribe();
 
-            await().atMost(TEN_SECONDS)
+            Awaitility.await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -815,7 +800,7 @@ class RabbitMQMailQueueTest {
                 .doOnNext(Throwing.consumer(item -> item.done(true)))
                 .subscribe();
 
-            await().atMost(TEN_SECONDS)
+            Awaitility.await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -843,9 +828,10 @@ class RabbitMQMailQueueTest {
                 .subscribe();
 
 
-            await().atMost(TEN_SECONDS)
+            Awaitility.await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> 
assertThat(deadLetteredCount.get()).isEqualTo(1));
         }
+
         private void resumeDequeuing(Sender sender) {
             sender.bindQueue(getMailQueueBindingSpecification()).block();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to