This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 81307f2 JAMES-3687 Clear should remove delayed emails (#830)
81307f2 is described below
commit 81307f217a6694baee2ce900be3602fe7e913a07
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Jan 11 22:04:56 2022 +0700
JAMES-3687 Clear should remove delayed emails (#830)
- Add an explicit test
- Fix the pulsar implementation
---
.../queue/api/DelayedManageableMailQueueContract.java | 16 +++++++++++++++-
.../org/apache/james/queue/pulsar/PulsarMailQueue.scala | 12 +++++++-----
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
index b5a1a11..bbbbf77 100644
---
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
+++
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
@@ -24,7 +24,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.james.junit.ExecutorExtension;
@@ -57,6 +56,21 @@ public interface DelayedManageableMailQueueContract extends
DelayedMailQueueCont
}
@Test
+ default void delayedMessagesShouldBeCleared() throws Exception {
+ getManageableMailQueue().enQueue(defaultMail()
+ .name("name1")
+ .build(),
+ 30L,
+ TimeUnit.SECONDS);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().getSize()).isEqualTo(1L));
+
+ getManageableMailQueue().clear();
+
+ assertThat(getManageableMailQueue().getSize()).isEqualTo(0L);
+ }
+
+ @Test
default void flushShouldRemoveDelaysWhenImmediateMessageFirst() throws
Exception {
getManageableMailQueue().enQueue(defaultMail()
.name("name1")
diff --git
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 15d1f33..8a5bfc7 100644
---
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -19,6 +19,10 @@
package org.apache.james.queue.pulsar
+import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
+import java.util.concurrent.TimeUnit
+import java.util.{Date, UUID}
+
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source,
SourceQueueWithComplete, StreamConverters}
import akka.stream.{Attributes, OverflowStrategy}
@@ -27,6 +31,8 @@ import akka.{Done, NotUsed}
import com.sksamuel.pulsar4s._
import com.sksamuel.pulsar4s.akka.streams
import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
import org.apache.james.backends.pulsar.{PulsarConfiguration, PulsarReader}
import org.apache.james.blob.api.{BlobId, Store}
import org.apache.james.blob.mail.MimeMessagePartsId
@@ -43,11 +49,6 @@ import org.apache.pulsar.client.api.{Schema,
SubscriptionInitialPosition, Subscr
import org.reactivestreams.Publisher
import play.api.libs.json._
-import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
-import java.util.concurrent.TimeUnit
-import java.util.{Date, UUID}
-import javax.mail.MessagingException
-import javax.mail.internet.MimeMessage
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor,
Future, Promise}
import scala.jdk.CollectionConverters._
@@ -443,6 +444,7 @@ class PulsarMailQueue(
override def clear(): Long = {
val count = getSize()
admin.topics().delete(outTopic.name, true)
+ admin.topics().delete(scheduledTopic.name, true)
count
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]