This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 81307f2  JAMES-3687 Clear should remove delayed emails (#830)
81307f2 is described below

commit 81307f217a6694baee2ce900be3602fe7e913a07
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Jan 11 22:04:56 2022 +0700

    JAMES-3687 Clear should remove delayed emails (#830)
    
    - Add an explicit test
     - Fix the pulsar implementation
---
 .../queue/api/DelayedManageableMailQueueContract.java    | 16 +++++++++++++++-
 .../org/apache/james/queue/pulsar/PulsarMailQueue.scala  | 12 +++++++-----
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
index b5a1a11..bbbbf77 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
@@ -24,7 +24,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.junit.ExecutorExtension;
@@ -57,6 +56,21 @@ public interface DelayedManageableMailQueueContract extends 
DelayedMailQueueCont
     }
 
     @Test
+    default void delayedMessagesShouldBeCleared() throws Exception {
+        getManageableMailQueue().enQueue(defaultMail()
+                .name("name1")
+                .build(),
+            30L,
+            TimeUnit.SECONDS);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(getManageableMailQueue().getSize()).isEqualTo(1L));
+
+        getManageableMailQueue().clear();
+
+        assertThat(getManageableMailQueue().getSize()).isEqualTo(0L);
+    }
+
+    @Test
     default void flushShouldRemoveDelaysWhenImmediateMessageFirst() throws 
Exception {
         getManageableMailQueue().enQueue(defaultMail()
                 .name("name1")
diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 15d1f33..8a5bfc7 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -19,6 +19,10 @@
 
 package org.apache.james.queue.pulsar
 
+import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
+import java.util.concurrent.TimeUnit
+import java.util.{Date, UUID}
+
 import akka.actor.{ActorRef, ActorSystem}
 import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, 
SourceQueueWithComplete, StreamConverters}
 import akka.stream.{Attributes, OverflowStrategy}
@@ -27,6 +31,8 @@ import akka.{Done, NotUsed}
 import com.sksamuel.pulsar4s._
 import com.sksamuel.pulsar4s.akka.streams
 import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
 import org.apache.james.backends.pulsar.{PulsarConfiguration, PulsarReader}
 import org.apache.james.blob.api.{BlobId, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
@@ -43,11 +49,6 @@ import org.apache.pulsar.client.api.{Schema, 
SubscriptionInitialPosition, Subscr
 import org.reactivestreams.Publisher
 import play.api.libs.json._
 
-import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
-import java.util.concurrent.TimeUnit
-import java.util.{Date, UUID}
-import javax.mail.MessagingException
-import javax.mail.internet.MimeMessage
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, 
Future, Promise}
 import scala.jdk.CollectionConverters._
@@ -443,6 +444,7 @@ class PulsarMailQueue(
   override def clear(): Long = {
     val count = getSize()
     admin.topics().delete(outTopic.name, true)
+    admin.topics().delete(scheduledTopic.name, true)
     count
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to