This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch quorum-queue-fix-3.7 in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 97bd7e4ab9095b4a588f721c865c57526274c848 Author: Rene Cordier <[email protected]> AuthorDate: Wed Jul 5 10:09:51 2023 +0700 Revert "JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1616)" This reverts commit f44d1efc37c26cfabfdc9dfe482698149d5caa78. --- .../backends/rabbitmq/RabbitMQManagementAPI.java | 3 -- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 2 +- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 13 +---- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +- .../queue/rabbitmq/view/api/MailQueueView.java | 3 +- .../view/cassandra/CassandraMailQueueView.java | 12 +++-- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 58 ++++++++-------------- 7 files changed, 34 insertions(+), 61 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java index 826925ef1e..5661a4d314 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java @@ -441,9 +441,6 @@ public interface RabbitMQManagementAPI { @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false) void deleteQueue(@Param("vhost") String vhost, @Param("name") String name); - @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", decodeSlash = false) - void purgeQueue(@Param("vhost") String vhost, @Param("name") String name); - @RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", decodeSlash = false) List<BindingSource> listBindings(@Param("vhost") String vhost, @Param("name") String name); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 671400a06d..573b7a2497 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -140,7 +140,7 @@ class Dequeuer { if (success) { dequeueMetric.increment(); response.ack(); - Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()))).block(); + mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds())); } else { response.nack(REQUEUE); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index b6d97b74fc..b72139c942 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -25,7 +25,6 @@ import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; import java.time.Clock; import java.time.Duration; -import java.util.function.Function; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -35,7 +34,6 @@ import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; -import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; @@ -81,20 +79,11 @@ class Enqueuer { return Flux.mergeDelayError(2, mailQueueView.storeMail(enqueuedItem), publishReferenceToRabbit(mailReference)) - .then() - .onErrorResume(cleanupMailQueueView(enqueueId, mailReference)); + .then(); }).sneakyThrow()) .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)); } - private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId enqueueId, MailReference mailReference) { - return (Throwable e) -> { - DeleteCondition.WithEnqueueId deleteCondition = DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId()); - return Mono.from(mailQueueView.delete(deleteCondition)) - .thenReturn(Mono.<Void>error(e)); - }; - } - Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) { Mail mail = item.getMail(); return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId())) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index f311d7bd9a..602e679c33 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -116,12 +116,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue { @Override public long clear() { - return Mono.from(mailQueueView.delete(DeleteCondition.all())).block(); + return mailQueueView.delete(DeleteCondition.all()); } @Override public long remove(Type type, String value) { - return Mono.from(mailQueueView.delete(DeleteCondition.from(type, value))).block(); + return mailQueueView.delete(DeleteCondition.from(type, value)); } @Override diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 7afcf99fc7..921a08bdcd 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -25,7 +25,6 @@ import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,7 +39,7 @@ public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView> Mono<Void> storeMail(EnqueuedItem enqueuedItem); - Publisher<Long> delete(DeleteCondition deleteCondition); + long delete(DeleteCondition deleteCondition); Mono<Boolean> isPresent(EnqueueId id); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index 83270db79b..e76df88fa5 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -127,23 +127,25 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB } @Override - public Mono<Long> delete(DeleteCondition deleteCondition) { + public long delete(DeleteCondition deleteCondition) { if (deleteCondition instanceof DeleteCondition.WithEnqueueId) { DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition; - return delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()) - .thenReturn(1L); + delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block(); + return 1L; } return browseThenDelete(deleteCondition); } - private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) { + private long browseThenDelete(DeleteCondition deleteCondition) { return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(EnqueuedItemWithSlicingContext::getEnqueuedItem) .filter(deleteCondition::shouldBeDeleted) .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName) .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), DELETION_CONCURRENCY) .count() - .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)); + .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) + .subscribeOn(Schedulers.elastic()) + .block(); } private Mono<Void> delete(EnqueueId enqueueId, diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 8fa34fa0d2..f291b1f46d 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -31,7 +31,6 @@ import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1; import static org.apache.mailet.base.MailAddressFixture.SENDER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.awaitility.Awaitility.await; import static org.awaitility.Durations.TEN_SECONDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; @@ -41,7 +40,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; @@ -93,7 +91,6 @@ import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -379,12 +376,7 @@ class RabbitMQMailQueueTest { String name1 = "myMail1"; String name2 = "myMail2"; String name3 = "myMail3"; - - List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>(); - Flux.from(getMailQueue().deQueue()) - .doOnNext(receivedItem::add) - .subscribe(); - + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); getMailQueue().enQueue(defaultMail() .name(name1) .build()); @@ -407,16 +399,19 @@ class RabbitMQMailQueueTest { .name(name3) .build()); - await().atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> assertThat(receivedItem) - .extracting(item -> item.getMail().getName()) - .contains(name1, name3)); + List<MailQueue.MailQueueItem> items = dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10)); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .contains(name1, name3); } @Test void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception { String name = "myMail"; + rabbitMQExtension.getRabbitMQ().pause(); + Thread.sleep(2000); try { getMailQueue().enQueue(defaultMail() @@ -425,7 +420,8 @@ class RabbitMQMailQueueTest { } catch (Exception e) { // Ignore } - rabbitMQExtension.managementAPI().purgeQueue("/", "JamesMailQueue-workqueue-spool"); + rabbitMQExtension.getRabbitMQ().unpause(); + Thread.sleep(100); getMailQueue().republishNotProcessedMails(clock.instant().plus(30, ChronoUnit.MINUTES)).blockLast(); @@ -646,25 +642,14 @@ class RabbitMQMailQueueTest { } private void dequeueMails(int times) { - AtomicInteger counter = new AtomicInteger(0); - Disposable disposable = Flux.from(getManageableMailQueue() - .deQueue()) - .concatMap(mailQueueItem -> Mono.fromCallable(() -> { - if (counter.getAndIncrement() < times) { - mailQueueItem.done(true); - return mailQueueItem; - } else { - mailQueueItem.done(false); - return null; - } - }).subscribeOn(Schedulers.elastic())) - .subscribe(); - - try { - await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times)); - } finally { - disposable.dispose(); - } + Flux.from(getManageableMailQueue() + .deQueue()) + .take(times) + .flatMap(mailQueueItem -> Mono.fromCallable(() -> { + mailQueueItem.done(true); + return mailQueueItem; + })) + .blockLast(); } @Test @@ -776,7 +761,7 @@ class RabbitMQMailQueueTest { .doOnNext(Throwing.consumer(item -> item.done(true))) .subscribe(); - await().atMost(TEN_SECONDS) + Awaitility.await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } @@ -815,7 +800,7 @@ class RabbitMQMailQueueTest { .doOnNext(Throwing.consumer(item -> item.done(true))) .subscribe(); - await().atMost(TEN_SECONDS) + Awaitility.await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } @@ -843,9 +828,10 @@ class RabbitMQMailQueueTest { .subscribe(); - await().atMost(TEN_SECONDS) + Awaitility.await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1)); } + private void resumeDequeuing(Sender sender) { sender.bindQueue(getMailQueueBindingSpecification()).block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
